summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid/serial.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/hid/serial.py')
-rw-r--r--kvmd/plugins/hid/serial.py71
1 files changed, 48 insertions, 23 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index bb97aec4..c07ea45c 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -21,7 +21,6 @@
import os
-import asyncio
import multiprocessing
import multiprocessing.queues
import dataclasses
@@ -35,7 +34,9 @@ from typing import List
from typing import Dict
from typing import Iterable
from typing import AsyncGenerator
+from typing import Optional
+import gpiod
import serial
from ...logging import get_logger
@@ -45,7 +46,7 @@ from ...keyboard.mappings import KEYMAP
from ... import aiotools
from ... import aiomulti
from ... import aioproc
-from ... import gpio
+from ... import aiogp
from ...yamlconf import Option
@@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed
-from ...validators.hw import valid_gpio_pin
+from ...validators.hw import valid_gpio_pin_optional
from . import BaseHid
@@ -156,6 +157,45 @@ class _MouseWheelEvent(_BaseEvent):
return struct.pack(">Bxbxx", 0x14, self.delta_y)
+class _Gpio:
+ def __init__(self, reset_pin: int, reset_delay: float) -> None:
+ self.__reset_pin = reset_pin
+ self.__reset_delay = reset_delay
+
+ self.__chip: Optional[gpiod.Chip] = None
+ self.__reset_line: Optional[gpiod.Line] = None
+ self.__reset_wip = False
+
+ def open(self) -> None:
+ if self.__reset_pin >= 0:
+ assert self.__chip is None
+ assert self.__reset_line is None
+ self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
+ self.__reset_line = self.__chip.get_line(self.__reset_pin)
+ self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0)
+
+ def close(self) -> None:
+ if self.__chip:
+ try:
+ self.__chip.close()
+ except Exception:
+ pass
+
+ @aiotools.atomic
+ async def reset(self) -> None:
+ if self.__reset_pin >= 0:
+ assert self.__reset_line
+ if not self.__reset_wip:
+ self.__reset_wip = True
+ try:
+ await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
+ finally:
+ self.__reset_wip = False
+ get_logger(0).info("Reset HID performed")
+ else:
+ get_logger(0).info("Another reset HID in progress")
+
+
# =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
@@ -175,9 +215,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
multiprocessing.Process.__init__(self, daemon=True)
- self.__reset_pin = gpio.set_output(reset_pin, False)
- self.__reset_delay = reset_delay
-
self.__device_path = device_path
self.__speed = speed
self.__read_timeout = read_timeout
@@ -187,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__errors_threshold = errors_threshold
self.__noop = noop
- self.__reset_wip = False
+ self.__gpio = _Gpio(reset_pin, reset_delay)
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
@@ -204,7 +241,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "reset_pin": Option(-1, type=valid_gpio_pin),
+ "reset_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_delay": Option(0.1, type=valid_float_f01),
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
@@ -218,6 +255,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
}
def sysprep(self) -> None:
+ self.__gpio.open()
get_logger(0).info("Starting HID daemon ...")
self.start()
@@ -247,20 +285,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@aiotools.atomic
async def reset(self) -> None:
- if not self.__reset_wip:
- try:
- self.__reset_wip = True
- gpio.write(self.__reset_pin, True)
- await asyncio.sleep(self.__reset_delay)
- finally:
- try:
- gpio.write(self.__reset_pin, False)
- await asyncio.sleep(1)
- finally:
- self.__reset_wip = False
- get_logger().info("Reset HID performed")
- else:
- get_logger().info("Another reset HID in progress")
+ await self.__gpio.reset()
@aiotools.atomic
async def cleanup(self) -> None:
@@ -279,7 +304,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
except Exception:
logger.exception("Can't clear HID events")
finally:
- gpio.write(self.__reset_pin, False)
+ self.__gpio.close()
# =====