summaryrefslogtreecommitdiff
path: root/kvmd/plugins
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-10-28 21:19:19 +0300
committerDevaev Maxim <[email protected]>2020-11-11 22:24:25 +0300
commitdc0340583ecd2f76939edfc85cf69417743c088d (patch)
tree74dc5f112363b8fb9400f1e79f0fd36b56ec9284 /kvmd/plugins
parentc27b8909dc425b4c06a12264de77877419a13497 (diff)
splitting serial
Diffstat (limited to 'kvmd/plugins')
-rw-r--r--kvmd/plugins/hid/serial/__init__.py (renamed from kvmd/plugins/hid/serial.py)77
-rw-r--r--kvmd/plugins/hid/serial/gpio.py71
2 files changed, 89 insertions, 59 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial/__init__.py
index a45cd89a..2b1c0af6 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial/__init__.py
@@ -33,33 +33,31 @@ from typing import List
from typing import Dict
from typing import Iterable
from typing import AsyncGenerator
-from typing import Optional
-import gpiod
import serial
-from ...logging import get_logger
+from ....logging import get_logger
-from ...keyboard.mappings import KEYMAP
+from ....keyboard.mappings import KEYMAP
-from ... import env
-from ... import tools
-from ... import aiotools
-from ... import aiomulti
-from ... import aioproc
-from ... import aiogp
+from .... import tools
+from .... import aiotools
+from .... import aiomulti
+from .... import aioproc
-from ...yamlconf import Option
+from ....yamlconf import Option
-from ...validators.basic import valid_bool
-from ...validators.basic import valid_int_f0
-from ...validators.basic import valid_int_f1
-from ...validators.basic import valid_float_f01
-from ...validators.os import valid_abs_path
-from ...validators.hw import valid_tty_speed
-from ...validators.hw import valid_gpio_pin_optional
+from ....validators.basic import valid_bool
+from ....validators.basic import valid_int_f0
+from ....validators.basic import valid_int_f1
+from ....validators.basic import valid_float_f01
+from ....validators.os import valid_abs_path
+from ....validators.hw import valid_tty_speed
+from ....validators.hw import valid_gpio_pin_optional
-from . import BaseHid
+from .. import BaseHid
+
+from .gpio import Gpio
# =====
@@ -156,45 +154,6 @@ class _MouseWheelEvent(_BaseEvent):
return struct.pack(">Bxbxx", 0x14, self.delta_y)
-class _Gpio:
- def __init__(self, reset_pin: int, reset_delay: float) -> None:
- self.__reset_pin = reset_pin
- self.__reset_delay = reset_delay
-
- self.__chip: Optional[gpiod.Chip] = None
- self.__reset_line: Optional[gpiod.Line] = None
- self.__reset_wip = False
-
- def open(self) -> None:
- if self.__reset_pin >= 0:
- assert self.__chip is None
- assert self.__reset_line is None
- self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
- self.__reset_line = self.__chip.get_line(self.__reset_pin)
- self.__reset_line.request("kvmd::hid-serial::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
-
- def close(self) -> None:
- if self.__chip:
- try:
- self.__chip.close()
- except Exception:
- pass
-
- @aiotools.atomic
- async def reset(self) -> None:
- if self.__reset_pin >= 0:
- assert self.__reset_line
- if not self.__reset_wip:
- self.__reset_wip = True
- try:
- await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
- finally:
- self.__reset_wip = False
- get_logger(0).info("Reset HID performed")
- else:
- get_logger(0).info("Another reset HID in progress")
-
-
# =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
@@ -223,7 +182,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__errors_threshold = errors_threshold
self.__noop = noop
- self.__gpio = _Gpio(reset_pin, reset_delay)
+ self.__gpio = Gpio(reset_pin, reset_delay)
self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
diff --git a/kvmd/plugins/hid/serial/gpio.py b/kvmd/plugins/hid/serial/gpio.py
new file mode 100644
index 00000000..a3e4018b
--- /dev/null
+++ b/kvmd/plugins/hid/serial/gpio.py
@@ -0,0 +1,71 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Optional
+
+import gpiod
+
+from ....logging import get_logger
+
+from .... import env
+from .... import aiotools
+from .... import aiogp
+
+
+# =====
+class Gpio:
+ def __init__(self, reset_pin: int, reset_delay: float) -> None:
+ self.__reset_pin = reset_pin
+ self.__reset_delay = reset_delay
+
+ self.__chip: Optional[gpiod.Chip] = None
+ self.__reset_line: Optional[gpiod.Line] = None
+ self.__reset_wip = False
+
+ def open(self) -> None:
+ if self.__reset_pin >= 0:
+ assert self.__chip is None
+ assert self.__reset_line is None
+ self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
+ self.__reset_line = self.__chip.get_line(self.__reset_pin)
+ self.__reset_line.request("kvmd::hid-serial::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
+
+ def close(self) -> None:
+ if self.__chip:
+ try:
+ self.__chip.close()
+ except Exception:
+ pass
+
+ @aiotools.atomic
+ async def reset(self) -> None:
+ if self.__reset_pin >= 0:
+ assert self.__reset_line
+ if not self.__reset_wip:
+ self.__reset_wip = True
+ try:
+ await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
+ finally:
+ self.__reset_wip = False
+ get_logger(0).info("Reset HID performed")
+ else:
+ get_logger(0).info("Another reset HID in progress")