diff options
Diffstat (limited to 'kvmd/plugins/hid/serial.py')
-rw-r--r-- | kvmd/plugins/hid/serial.py | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index bb97aec4..c07ea45c 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -21,7 +21,6 @@ import os -import asyncio import multiprocessing import multiprocessing.queues import dataclasses @@ -35,7 +34,9 @@ from typing import List from typing import Dict from typing import Iterable from typing import AsyncGenerator +from typing import Optional +import gpiod import serial from ...logging import get_logger @@ -45,7 +46,7 @@ from ...keyboard.mappings import KEYMAP from ... import aiotools from ... import aiomulti from ... import aioproc -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path from ...validators.hw import valid_tty_speed -from ...validators.hw import valid_gpio_pin +from ...validators.hw import valid_gpio_pin_optional from . import BaseHid @@ -156,6 +157,45 @@ class _MouseWheelEvent(_BaseEvent): return struct.pack(">Bxbxx", 0x14, self.delta_y) +class _Gpio: + def __init__(self, reset_pin: int, reset_delay: float) -> None: + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__reset_line: Optional[gpiod.Line] = None + self.__reset_wip = False + + def open(self) -> None: + if self.__reset_pin >= 0: + assert self.__chip is None + assert self.__reset_line is None + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + @aiotools.atomic + async def reset(self) -> None: + if self.__reset_pin >= 0: + assert self.__reset_line + if not self.__reset_wip: + self.__reset_wip = True + try: + await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) + finally: + self.__reset_wip = False + get_logger(0).info("Reset HID performed") + else: + get_logger(0).info("Another reset HID in progress") + + # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called @@ -175,9 +215,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst multiprocessing.Process.__init__(self, daemon=True) - self.__reset_pin = gpio.set_output(reset_pin, False) - self.__reset_delay = reset_delay - self.__device_path = device_path self.__speed = speed self.__read_timeout = read_timeout @@ -187,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__errors_threshold = errors_threshold self.__noop = noop - self.__reset_wip = False + self.__gpio = _Gpio(reset_pin, reset_delay) self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() @@ -204,7 +241,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @classmethod def get_plugin_options(cls) -> Dict: return { - "reset_pin": Option(-1, type=valid_gpio_pin), + "reset_pin": Option(-1, type=valid_gpio_pin_optional), "reset_delay": Option(0.1, type=valid_float_f01), "device": Option("", type=valid_abs_path, unpack_as="device_path"), @@ -218,6 +255,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst } def sysprep(self) -> None: + self.__gpio.open() get_logger(0).info("Starting HID daemon ...") self.start() @@ -247,20 +285,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @aiotools.atomic async def reset(self) -> None: - if not self.__reset_wip: - try: - self.__reset_wip = True - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - finally: - try: - gpio.write(self.__reset_pin, False) - await asyncio.sleep(1) - finally: - self.__reset_wip = False - get_logger().info("Reset HID performed") - else: - get_logger().info("Another reset HID in progress") + await self.__gpio.reset() @aiotools.atomic async def cleanup(self) -> None: @@ -279,7 +304,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst except Exception: logger.exception("Can't clear HID events") finally: - gpio.write(self.__reset_pin, False) + self.__gpio.close() # ===== |