diff options
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 100 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 71 | ||||
-rw-r--r-- | kvmd/plugins/msd/relay.py | 87 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 62 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 2 |
6 files changed, 200 insertions, 124 deletions
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index e3225cf0..b49dd689 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -20,24 +20,25 @@ # ========================================================================== # -import asyncio - from typing import Dict from typing import AsyncGenerator +from typing import Optional + +import gpiod from ...logging import get_logger from ... import aiotools -from ... import gpio +from ... import aiogp from ...yamlconf import Option from ...validators.basic import valid_bool +from ...validators.basic import valid_float_f0 from ...validators.basic import valid_float_f01 from ...validators.hw import valid_gpio_pin - from . import AtxIsBusyError from . import BaseAtx @@ -46,27 +47,24 @@ from . import BaseAtx class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, - power_led_pin: int, - hdd_led_pin: int, power_led_inverted: bool, + power_led_debounce: float, + + hdd_led_pin: int, hdd_led_inverted: bool, + hdd_led_debounce: float, power_switch_pin: int, reset_switch_pin: int, click_delay: float, long_click_delay: float, - - state_poll: float, ) -> None: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin, False) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) - - self.__power_led_inverted = power_led_inverted - self.__hdd_led_inverted = hdd_led_inverted + self.__power_led_pin = power_led_pin + self.__hdd_led_pin = hdd_led_pin + self.__power_switch_pin = power_switch_pin + self.__reset_switch_pin = reset_switch_pin self.__click_delay = click_delay self.__long_click_delay = long_click_delay @@ -74,9 +72,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__notifier = aiotools.AioNotifier() self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) - self.__reader = gpio.BatchReader( - pins=set([self.__power_led_pin, self.__hdd_led_pin]), - interval=state_poll, + self.__chip: Optional[gpiod.Chip] = None + self.__power_switch_line: Optional[gpiod.Line] = None + self.__reset_switch_line: Optional[gpiod.Line] = None + + self.__reader = aiogp.AioReader( + path=aiogp.DEVICE_PATH, + consumer="kvmd/atx-gpio/leds", + pins={ + power_led_pin: aiogp.AioReaderPinParams(power_led_inverted, power_led_debounce), + hdd_led_pin: aiogp.AioReaderPinParams(hdd_led_inverted, hdd_led_debounce), + }, notifier=self.__notifier, ) @@ -84,25 +90,39 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def get_plugin_options(cls) -> Dict: return { "power_led_pin": Option(-1, type=valid_gpio_pin), - "hdd_led_pin": Option(-1, type=valid_gpio_pin), "power_led_inverted": Option(False, type=valid_bool), - "hdd_led_inverted": Option(False, type=valid_bool), + "power_led_debounce": Option(0.1, type=valid_float_f0), + + "hdd_led_pin": Option(-1, type=valid_gpio_pin), + "hdd_led_inverted": Option(False, type=valid_bool), + "hdd_led_debounce": Option(0.1, type=valid_float_f0), "power_switch_pin": Option(-1, type=valid_gpio_pin), "reset_switch_pin": Option(-1, type=valid_gpio_pin), "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), } + def sysprep(self) -> None: + assert self.__chip is None + assert self.__power_switch_line is None + assert self.__reset_switch_line is None + + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + + self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin) + self.__power_switch_line.request("kvmd/atx-gpio/power_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin) + self.__reset_switch_line.request("kvmd/atx-gpio/reset_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + async def get_state(self) -> Dict: return { "enabled": True, "busy": self.__region.is_busy(), "leds": { - "power": (self.__reader.get(self.__power_led_pin) ^ self.__power_led_inverted), - "hdd": (self.__reader.get(self.__hdd_led_pin) ^ self.__hdd_led_inverted), + "power": self.__reader.get(self.__power_led_pin), + "hdd": self.__reader.get(self.__hdd_led_pin), }, } @@ -119,14 +139,11 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes await self.__reader.poll() async def cleanup(self) -> None: - for (name, pin) in [ - ("power", self.__power_switch_pin), - ("reset", self.__reset_switch_pin), - ]: + if self.__chip: try: - gpio.write(pin, False) + self.__chip.close() except Exception: - get_logger(0).exception("Can't cleanup %s pin %d", name, pin) + pass # ===== @@ -149,13 +166,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes # ===== async def click_power(self, wait: bool) -> None: - await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) + await self.__click("power", self.__power_switch_line, self.__click_delay, wait) async def click_power_long(self, wait: bool) -> None: - await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) + await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait) async def click_reset(self, wait: bool) -> None: - await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) + await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait) # ===== @@ -163,22 +180,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes return (await self.get_state())["leds"]["power"] @aiotools.atomic - async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: + async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None: if wait: async with self.__region: - await self.__inner_click(name, pin, delay) + await self.__inner_click(name, line, delay) else: await aiotools.run_region_task( - "Can't perform ATX click or operation was not completed", - self.__region, self.__inner_click, name, pin, delay, + f"Can't perform ATX {name} click or operation was not completed", + self.__region, self.__inner_click, name, line, delay, ) @aiotools.atomic - async def __inner_click(self, name: str, pin: int, delay: float) -> None: - try: - gpio.write(pin, True) - await asyncio.sleep(delay) - finally: - gpio.write(pin, False) - await asyncio.sleep(1) + async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: + await aiogp.pulse(line, delay, 1) get_logger(0).info("Clicked ATX button %r", name) diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index bb97aec4..c07ea45c 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -21,7 +21,6 @@ import os -import asyncio import multiprocessing import multiprocessing.queues import dataclasses @@ -35,7 +34,9 @@ from typing import List from typing import Dict from typing import Iterable from typing import AsyncGenerator +from typing import Optional +import gpiod import serial from ...logging import get_logger @@ -45,7 +46,7 @@ from ...keyboard.mappings import KEYMAP from ... import aiotools from ... import aiomulti from ... import aioproc -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path from ...validators.hw import valid_tty_speed -from ...validators.hw import valid_gpio_pin +from ...validators.hw import valid_gpio_pin_optional from . import BaseHid @@ -156,6 +157,45 @@ class _MouseWheelEvent(_BaseEvent): return struct.pack(">Bxbxx", 0x14, self.delta_y) +class _Gpio: + def __init__(self, reset_pin: int, reset_delay: float) -> None: + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__reset_line: Optional[gpiod.Line] = None + self.__reset_wip = False + + def open(self) -> None: + if self.__reset_pin >= 0: + assert self.__chip is None + assert self.__reset_line is None + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + @aiotools.atomic + async def reset(self) -> None: + if self.__reset_pin >= 0: + assert self.__reset_line + if not self.__reset_wip: + self.__reset_wip = True + try: + await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) + finally: + self.__reset_wip = False + get_logger(0).info("Reset HID performed") + else: + get_logger(0).info("Another reset HID in progress") + + # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called @@ -175,9 +215,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst multiprocessing.Process.__init__(self, daemon=True) - self.__reset_pin = gpio.set_output(reset_pin, False) - self.__reset_delay = reset_delay - self.__device_path = device_path self.__speed = speed self.__read_timeout = read_timeout @@ -187,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__errors_threshold = errors_threshold self.__noop = noop - self.__reset_wip = False + self.__gpio = _Gpio(reset_pin, reset_delay) self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() @@ -204,7 +241,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @classmethod def get_plugin_options(cls) -> Dict: return { - "reset_pin": Option(-1, type=valid_gpio_pin), + "reset_pin": Option(-1, type=valid_gpio_pin_optional), "reset_delay": Option(0.1, type=valid_float_f01), "device": Option("", type=valid_abs_path, unpack_as="device_path"), @@ -218,6 +255,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst } def sysprep(self) -> None: + self.__gpio.open() get_logger(0).info("Starting HID daemon ...") self.start() @@ -247,20 +285,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @aiotools.atomic async def reset(self) -> None: - if not self.__reset_wip: - try: - self.__reset_wip = True - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - finally: - try: - gpio.write(self.__reset_pin, False) - await asyncio.sleep(1) - finally: - self.__reset_wip = False - get_logger().info("Reset HID performed") - else: - get_logger().info("Another reset HID in progress") + await self.__gpio.reset() @aiotools.atomic async def cleanup(self) -> None: @@ -279,7 +304,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst except Exception: logger.exception("Can't clear HID events") finally: - gpio.write(self.__reset_pin, False) + self.__gpio.close() # ===== diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index 409d3d71..cec03377 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -35,12 +35,13 @@ from typing import Optional import aiofiles import aiofiles.base +import gpiod from ...logging import get_logger from ... import aiotools from ... import aiofs -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -152,6 +153,55 @@ def _explore_device(device_path: str) -> _DeviceInfo: ) +class _Gpio: + def __init__( + self, + target_pin: int, + reset_pin: int, + reset_delay: float, + ) -> None: + + self.__target_pin = target_pin + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__target_line: Optional[gpiod.Line] = None + self.__reset_line: Optional[gpiod.Line] = None + + def open(self) -> None: + assert self.__chip is None + assert self.__target_line is None + assert self.__reset_line is None + + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + + self.__target_line = self.__chip.get_line(self.__target_pin) + self.__target_line.request("kvmd/msd-relay/target", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd/msd-relay/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + def switch_to_local(self) -> None: + assert self.__target_line + self.__target_line.set_value(0) + + def switch_to_server(self) -> None: + assert self.__target_line + self.__target_line.set_value(1) + + async def reset(self) -> None: + assert self.__reset_line + await aiogp.pulse(self.__reset_line, self.__reset_delay, 0) + + # ===== class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=super-init-not-called @@ -165,13 +215,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes reset_delay: float, ) -> None: - self.__target_pin = gpio.set_output(target_pin, False) - self.__reset_pin = gpio.set_output(reset_pin, False) - self.__device_path = device_path self.__init_delay = init_delay self.__init_retries = init_retries - self.__reset_delay = reset_delay + + self.__gpio = _Gpio(target_pin, reset_pin, reset_delay) self.__device_info: Optional[_DeviceInfo] = None self.__connected = False @@ -202,6 +250,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes "reset_delay": Option(1.0, type=valid_float_f01), } + def sysprep(self) -> None: + self.__gpio.open() + async def get_state(self) -> Dict: storage: Optional[Dict] = None drive: Optional[Dict] = None @@ -245,26 +296,18 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes @aiotools.atomic async def __inner_reset(self) -> None: - try: - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - gpio.write(self.__reset_pin, False) - - gpio.write(self.__target_pin, False) - self.__connected = False - - await self.__load_device_info() - get_logger(0).info("MSD reset has been successful") - finally: - gpio.write(self.__reset_pin, False) + await self.__gpio.reset() + self.__gpio.switch_to_local() + self.__connected = False + await self.__load_device_info() + get_logger(0).info("MSD reset has been successful") @aiotools.atomic async def cleanup(self) -> None: try: await self.__close_device_file() finally: - gpio.write(self.__target_pin, False) - gpio.write(self.__reset_pin, False) + self.__gpio.close() # ===== @@ -283,7 +326,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes if self.__connected: raise MsdConnectedError() - gpio.write(self.__target_pin, True) + self.__gpio.switch_to_server() self.__connected = True get_logger(0).info("MSD switched to Server") @@ -294,12 +337,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes if not self.__connected: raise MsdDisconnectedError() - gpio.write(self.__target_pin, False) + self.__gpio.switch_to_local() try: await self.__load_device_info() except Exception: if self.__connected: - gpio.write(self.__target_pin, True) + self.__gpio.switch_to_server() raise self.__connected = False get_logger(0).info("MSD switched to KVM: %s", self.__device_info) diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 9ed48a5f..c280b2a8 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin): def get_modes(cls) -> Set[str]: return set(UserGpioModes.ALL) - def register_input(self, pin: int) -> None: + def register_input(self, pin: int, debounce: float) -> None: raise NotImplementedError def register_output(self, pin: int, initial: Optional[bool]) -> None: diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 90426e13..4b951057 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -21,15 +21,12 @@ from typing import Dict -from typing import Set from typing import Optional -from ... import aiotools -from ... import gpio - -from ...yamlconf import Option +import gpiod -from ...validators.basic import valid_float_f01 +from ... import aiotools +from ... import aiogp from . import BaseUserGpioDriver @@ -40,59 +37,58 @@ class Plugin(BaseUserGpioDriver): self, instance_name: str, notifier: aiotools.AioNotifier, - - state_poll: float, ) -> None: super().__init__(instance_name, notifier) - self.__state_poll = state_poll - - self.__input_pins: Set[int] = set() + self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {} self.__output_pins: Dict[int, Optional[bool]] = {} - self.__reader: Optional[gpio.BatchReader] = None + self.__reader: Optional[aiogp.AioReader] = None - @classmethod - def get_plugin_options(cls) -> Dict: - return { - "state_poll": Option(0.1, type=valid_float_f01), - } + self.__chip: Optional[gpiod.Chip] = None + self.__output_lines: Dict[int, gpiod.Line] = {} - def register_input(self, pin: int) -> None: - self.__input_pins.add(pin) + def register_input(self, pin: int, debounce: float) -> None: + self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce) def register_output(self, pin: int, initial: Optional[bool]) -> None: self.__output_pins[pin] = initial def prepare(self) -> None: assert self.__reader is None - self.__reader = gpio.BatchReader( - pins=set([ - *map(gpio.set_input, self.__input_pins), - *[ - gpio.set_output(pin, initial) - for (pin, initial) in self.__output_pins.items() - ], - ]), - interval=self.__state_poll, + self.__reader = aiogp.AioReader( + path=aiogp.DEVICE_PATH, + consumer="kvmd/ugpio-gpio/inputs", + pins=self.__input_pins, notifier=self._notifier, ) + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + for (pin, initial) in self.__output_pins.items(): + line = self.__chip.get_line(pin) + line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False)) + self.__output_lines[pin] = line + async def run(self) -> None: assert self.__reader await self.__reader.poll() def cleanup(self) -> None: - for (pin, initial) in self.__output_pins.items(): - if initial is not None: - gpio.write(pin, initial) + if self.__chip: + try: + self.__chip.close() + except Exception: + pass def read(self, pin: int) -> bool: - return gpio.read(pin) + assert self.__reader + if pin in self.__input_pins: + return self.__reader.get(pin) + return bool(self.__output_lines[pin].get_value()) def write(self, pin: int, state: bool) -> None: - gpio.write(pin, state) + self.__output_lines[pin].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index b7f88cb8..32c4a3eb 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -79,7 +79,7 @@ class Plugin(BaseUserGpioDriver): def get_modes(cls) -> Set[str]: return set([UserGpioModes.OUTPUT]) - def register_input(self, pin: int) -> None: + def register_input(self, pin: int, debounce: float) -> None: raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") def register_output(self, pin: int, initial: Optional[bool]) -> None: |