diff options
author | Maxim Devaev <[email protected]> | 2021-09-08 05:43:36 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2021-09-08 05:43:36 +0300 |
commit | 98ad1145a8782d3b82516dd9f81c86d9e80391c5 (patch) | |
tree | b8f348f0815e8ee72236bf9879af779c95727755 | |
parent | 939c63fe7daf2ddc9a05c9e0fbeab63cb5c6f0c1 (diff) |
string pins
-rw-r--r-- | kvmd/apps/__init__.py | 14 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 4 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 14 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/ezcoo.py | 25 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 28 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 43 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/ipmi.py | 36 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/otgbind.py | 11 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/pwm.py | 22 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/tesmart.py | 24 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/wol.py | 10 |
11 files changed, 136 insertions, 95 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index facf9c27..c808b6d4 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -30,7 +30,7 @@ import logging.config from typing import Tuple from typing import List from typing import Dict -from typing import Set +from typing import Type from typing import Optional import pygments @@ -48,6 +48,7 @@ from ..plugins.atx import get_atx_class from ..plugins.msd import get_msd_class from ..plugins.ugpio import UserGpioModes +from ..plugins.ugpio import BaseUserGpioDriver from ..plugins.ugpio import get_ugpio_driver_class from ..yamlconf import ConfigError @@ -100,7 +101,6 @@ from ..validators.ugpio import valid_ugpio_mode from ..validators.ugpio import valid_ugpio_view_table from ..validators.hw import valid_tty_speed -from ..validators.hw import valid_gpio_pin from ..validators.hw import valid_otg_gadget from ..validators.hw import valid_otg_id from ..validators.hw import valid_otg_ethernet @@ -267,7 +267,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals if load_gpio: driver: str - drivers: Dict[str, Set[str]] = {} # Name to modes + drivers: Dict[str, Type[BaseUserGpioDriver]] = {} # Name to drivers for (driver, params) in { # type: ignore "__gpio__": {}, **tools.rget(raw_config, "kvmd", "gpio", "drivers"), @@ -277,7 +277,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals driver_type = valid_stripped_string_not_empty(params.get("type", "gpio")) driver_class = get_ugpio_driver_class(driver_type) - drivers[driver] = driver_class.get_modes() + drivers[driver] = driver_class scheme["kvmd"]["gpio"]["drivers"][driver] = { "type": Option(driver_type, type=valid_stripped_string_not_empty), **driver_class.get_plugin_options() @@ -294,12 +294,12 @@ def _patch_dynamic( # pylint: disable=too-many-locals mode: str = params.get("mode", "") with manual_validated(mode, *path, channel, "mode"): - mode = valid_ugpio_mode(mode, drivers[driver]) + mode = valid_ugpio_mode(mode, drivers[driver].get_modes()) scheme["kvmd"]["gpio"]["scheme"][channel] = { "driver": Option("__gpio__", type=functools.partial(valid_ugpio_driver, variants=set(drivers))), - "pin": Option(-1, type=valid_gpio_pin), - "mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver])), + "pin": Option(None, type=drivers[driver].get_pin_validator()), + "mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver].get_modes())), "inverted": Option(False, type=valid_bool), **({ "busy_delay": Option(0.2, type=valid_float_f01), diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index ca04b31f..78d7f4f7 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -77,7 +77,7 @@ class _GpioInput: ) -> None: self.__channel = channel - self.__pin: int = config.pin + self.__pin: str = config.pin self.__inverted: bool = config.inverted self.__driver = driver @@ -118,7 +118,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes ) -> None: self.__channel = channel - self.__pin: int = config.pin + self.__pin: str = config.pin self.__inverted: bool = config.inverted self.__switch: bool = config.switch diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index eae65891..f63df5e4 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -22,6 +22,7 @@ from typing import Set from typing import Type +from typing import Callable from typing import Optional from typing import Any @@ -51,7 +52,6 @@ class GpioDriverOfflineError(GpioOperationError): class UserGpioModes: INPUT = "input" OUTPUT = "output" - ALL = set([INPUT, OUTPUT]) @@ -74,11 +74,15 @@ class BaseUserGpioDriver(BasePlugin): def get_modes(cls) -> Set[str]: return set(UserGpioModes.ALL) - def register_input(self, pin: int, debounce: float) -> None: + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + raise NotImplementedError + + def register_input(self, pin: str, debounce: float) -> None: _ = pin _ = debounce - def register_output(self, pin: int, initial: Optional[bool]) -> None: + def register_output(self, pin: str, initial: Optional[bool]) -> None: _ = pin _ = initial @@ -91,10 +95,10 @@ class BaseUserGpioDriver(BasePlugin): async def cleanup(self) -> None: pass - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: raise NotImplementedError - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: raise NotImplementedError diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py index 81e199e8..a20aed32 100644 --- a/kvmd/plugins/ugpio/ezcoo.py +++ b/kvmd/plugins/ugpio/ezcoo.py @@ -28,7 +28,9 @@ import time from typing import Tuple from typing import Dict +from typing import Callable from typing import Optional +from typing import Any import serial @@ -85,6 +87,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)), } + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_number(arg, min=0, max=3, name="Ezcoo channel"))) + def prepare(self) -> None: assert self.__proc is None self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) @@ -105,16 +111,16 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute if self.__proc.is_alive() or self.__proc.exitcode is not None: self.__proc.join() - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: if not self.__is_online(): raise GpioDriverOfflineError(self) - return (self.__channel == pin) + return (self.__channel == int(pin)) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) - if state and (0 <= pin <= 3): - self.__ctl_queue.put_nowait(pin) + if state: + self.__ctl_queue.put_nowait(int(pin)) # ===== @@ -174,9 +180,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute return (channel, data) def __send_channel(self, tty: serial.Serial, channel: int) -> None: - # Twice because of ezcoo bugs - cmd = (b"SET" if self.__protocol == 1 else b"EZS") - tty.write((b"%s OUT1 VS IN%d\n" % (cmd, channel + 1)) * 2) + assert 0 <= channel <= 3 + cmd = b"%s OUT1 VS IN%d\n" % ( + (b"SET" if self.__protocol == 1 else b"EZS"), + channel + 1, + ) + tty.write(cmd * 2) # Twice because of ezcoo bugs tty.flush() def __str__(self) -> str: diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 28f0d5cb..315de6f1 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -21,7 +21,9 @@ from typing import Dict +from typing import Callable from typing import Optional +from typing import Any import gpiod @@ -31,6 +33,7 @@ from ... import aiogp from ...yamlconf import Option from ...validators.os import valid_abs_path +from ...validators.hw import valid_gpio_pin from . import BaseUserGpioDriver @@ -63,11 +66,15 @@ class Plugin(BaseUserGpioDriver): "device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="device_path"), } - def register_input(self, pin: int, debounce: float) -> None: - self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce) + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_gpio_pin(arg))) + + def register_input(self, pin: str, debounce: float) -> None: + self.__input_pins[int(pin)] = aiogp.AioReaderPinParams(False, debounce) - def register_output(self, pin: int, initial: Optional[bool]) -> None: - self.__output_pins[pin] = initial + def register_output(self, pin: str, initial: Optional[bool]) -> None: + self.__output_pins[int(pin)] = initial def prepare(self) -> None: assert self.__reader is None @@ -95,14 +102,15 @@ class Plugin(BaseUserGpioDriver): except Exception: pass - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: assert self.__reader - if pin in self.__input_pins: - return self.__reader.get(pin) - return bool(self.__output_lines[pin].get_value()) + pin_int = int(pin) + if pin_int in self.__input_pins: + return self.__reader.get(pin_int) + return bool(self.__output_lines[pin_int].get_value()) - async def write(self, pin: int, state: bool) -> None: - self.__output_lines[pin].set_value(int(state)) + async def write(self, pin: str, state: bool) -> None: + self.__output_lines[int(pin)].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index 7e18c72a..d4bdda48 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -25,7 +25,9 @@ import contextlib from typing import Dict from typing import Set +from typing import Callable from typing import Optional +from typing import Any import hid @@ -36,6 +38,7 @@ from ... import aiotools from ...yamlconf import Option +from ...validators.basic import valid_number from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path @@ -79,11 +82,12 @@ class Plugin(BaseUserGpioDriver): def get_modes(cls) -> Set[str]: return set([UserGpioModes.OUTPUT]) - def register_input(self, pin: int, debounce: float) -> None: - raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_number(arg, min=0, max=7, name="HID relay channel"))) - def register_output(self, pin: int, initial: Optional[bool]) -> None: - self.__initials[pin] = initial + def register_output(self, pin: str, initial: Optional[bool]) -> None: + self.__initials[int(pin)] = initial def prepare(self) -> None: logger = get_logger(0) @@ -113,15 +117,15 @@ class Plugin(BaseUserGpioDriver): self.__close_device() self.__stop = True - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: try: - return self.__inner_read(pin) + return self.__inner_read(int(pin)) except Exception: raise GpioDriverOfflineError(self) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: try: - return self.__inner_write(pin, state) + return self.__inner_write(int(pin), state) except Exception: raise GpioDriverOfflineError(self) @@ -140,27 +144,20 @@ class Plugin(BaseUserGpioDriver): pin, self, self.__device_path, tools.efmt(err)) def __inner_read(self, pin: int) -> bool: - if self.__check_pin(pin): - return bool(self.__inner_read_raw() & (1 << pin)) - return False + assert 0 <= pin <= 7 + return bool(self.__inner_read_raw() & (1 << pin)) def __inner_read_raw(self) -> int: with self.__ensure_device("reading") as device: return device.get_feature_report(1, 8)[7] def __inner_write(self, pin: int, state: bool) -> None: - if self.__check_pin(pin): - with self.__ensure_device("writing") as device: - report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 - result = device.send_feature_report(report) - if result < 0: - raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") - - def __check_pin(self, pin: int) -> bool: - ok = (0 <= pin <= 7) - if not ok: - get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path) - return ok + assert 0 <= pin <= 7 + with self.__ensure_device("writing") as device: + report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 + result = device.send_feature_report(report) + if result < 0: + raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") @contextlib.contextmanager def __ensure_device(self, context: str) -> hid.device: diff --git a/kvmd/plugins/ugpio/ipmi.py b/kvmd/plugins/ugpio/ipmi.py index 26ea9dbb..718c71cb 100644 --- a/kvmd/plugins/ugpio/ipmi.py +++ b/kvmd/plugins/ugpio/ipmi.py @@ -25,7 +25,9 @@ import functools from typing import List from typing import Dict +from typing import Callable from typing import Optional +from typing import Any from ...logging import get_logger @@ -35,6 +37,7 @@ from ... import aioproc from ...yamlconf import Option +from ...validators import check_string_in_list from ...validators.basic import valid_float_f01 from ...validators.net import valid_ip_or_host from ...validators.net import valid_port @@ -46,12 +49,12 @@ from . import BaseUserGpioDriver # ===== _OUTPUTS = { - 1: "on", - 2: "off", - 3: "cycle", - 4: "reset", - 5: "diag", - 6: "soft", + "1": "on", + "2": "off", + "3": "cycle", + "4": "reset", + "5": "diag", + "6": "soft", } @@ -108,14 +111,19 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute "state_poll": Option(1.0, type=valid_float_f01), } - def register_input(self, pin: int, debounce: float) -> None: + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + actions = ["0", *_OUTPUTS, "status", *_OUTPUTS.values()] + return (lambda arg: check_string_in_list(arg, "IPMI action", actions)) + + def register_input(self, pin: str, debounce: float) -> None: _ = debounce - if pin != 0: + if pin not in ["0", "status"]: raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") - def register_output(self, pin: int, initial: Optional[bool]) -> None: + def register_output(self, pin: str, initial: Optional[bool]) -> None: _ = initial - if pin not in _OUTPUTS: + if pin not in [*_OUTPUTS, *_OUTPUTS.values()]: raise RuntimeError(f"Unsupported mode 'output' for pin={pin} on {self}") def prepare(self) -> None: @@ -131,17 +139,17 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute prev = new await asyncio.sleep(self.__state_poll) - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: if not self.__online: raise GpioDriverOfflineError(self) - if pin == 0: + if pin == "0": return self.__power return False - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: if not self.__online: raise GpioDriverOfflineError(self) - action = _OUTPUTS[pin] + action = (_OUTPUTS[pin] if pin.isdigit() else pin) try: proc = await aioproc.log_process(**self.__make_ipmitool_kwargs(action), logger=get_logger(0)) if proc.returncode != 0: diff --git a/kvmd/plugins/ugpio/otgbind.py b/kvmd/plugins/ugpio/otgbind.py index 6dec46c9..600c8488 100644 --- a/kvmd/plugins/ugpio/otgbind.py +++ b/kvmd/plugins/ugpio/otgbind.py @@ -23,6 +23,9 @@ import os import asyncio +from typing import Callable +from typing import Any + from ...logging import get_logger from ...inotify import InotifyMask @@ -50,6 +53,10 @@ class Plugin(BaseUserGpioDriver): self.__udc = udc self.__driver = "" + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return str + def prepare(self) -> None: (self.__udc, self.__driver) = usb.find_udc(self.__udc) get_logger().info("Using UDC %s", self.__udc) @@ -83,11 +90,11 @@ class Plugin(BaseUserGpioDriver): except Exception: logger.exception("Unexpected OTG-bind watcher error") - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: _ = pin return os.path.islink(self.__get_driver_path(self.__udc)) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: _ = pin with open(self.__get_driver_path("bind" if state else "unbind"), "w") as ctl_file: ctl_file.write(f"{self.__udc}\n") diff --git a/kvmd/plugins/ugpio/pwm.py b/kvmd/plugins/ugpio/pwm.py index 582bbb50..2bd8807e 100644 --- a/kvmd/plugins/ugpio/pwm.py +++ b/kvmd/plugins/ugpio/pwm.py @@ -22,8 +22,10 @@ from typing import Dict -from typing import Optional from typing import Set +from typing import Callable +from typing import Optional +from typing import Any from periphery import PWM @@ -35,6 +37,7 @@ from ... import aiotools from ...yamlconf import Option from ...validators.basic import valid_int_f0 +from ...validators.hw import valid_gpio_pin from . import GpioDriverOfflineError from . import UserGpioModes @@ -77,11 +80,12 @@ class Plugin(BaseUserGpioDriver): def get_modes(cls) -> Set[str]: return set([UserGpioModes.OUTPUT]) - def register_input(self, pin: int, debounce: float) -> None: - raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_gpio_pin(arg))) - def register_output(self, pin: int, initial: Optional[bool]) -> None: - self.__channels[pin] = initial + def register_output(self, pin: str, initial: Optional[bool]) -> None: + self.__channels[int(pin)] = initial def prepare(self) -> None: logger = get_logger(0) @@ -106,15 +110,15 @@ class Plugin(BaseUserGpioDriver): get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s", self.__chip, pin, tools.efmt(err)) - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: try: - return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push) + return (self.__pwms[int(pin)].duty_cycle_ns == self.__duty_cycle_push) except Exception: raise GpioDriverOfflineError(self) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: try: - self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state) + self.__pwms[int(pin)].duty_cycle_ns = self.__get_duty_cycle(state) except Exception: raise GpioDriverOfflineError(self) diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py index 7fbd37db..27cf0038 100644 --- a/kvmd/plugins/ugpio/tesmart.py +++ b/kvmd/plugins/ugpio/tesmart.py @@ -24,7 +24,9 @@ import asyncio from typing import Tuple from typing import Dict +from typing import Callable from typing import Optional +from typing import Any from ...logging import get_logger @@ -33,6 +35,7 @@ from ... import aiotools from ...yamlconf import Option +from ...validators.basic import valid_number from ...validators.basic import valid_float_f0 from ...validators.basic import valid_float_f01 from ...validators.net import valid_ip_or_host @@ -79,15 +82,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute "state_poll": Option(10.0, type=valid_float_f01), } - def register_input(self, pin: int, debounce: float) -> None: - if not (0 <= pin < 16): - raise RuntimeError(f"Unsupported port number: {pin}") - _ = debounce - - def register_output(self, pin: int, initial: Optional[bool]) -> None: - if not (0 <= pin < 16): - raise RuntimeError(f"Unsupported port number: {pin}") - _ = initial + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_number(arg, min=0, max=15, name="Tesmart channel"))) async def run(self) -> None: prev_active = -2 @@ -104,12 +101,13 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute async def cleanup(self) -> None: await self.__close_device() - async def read(self, pin: int) -> bool: - return (self.__active == pin) + async def read(self, pin: str) -> bool: + return (self.__active == int(pin)) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: + assert 0 <= pin <= 15 if state: - await self.__send_command("{:c}{:c}".format(1, pin + 1).encode()) + await self.__send_command("{:c}{:c}".format(1, int(pin) + 1).encode()) await self.__update_notifier.notify() await asyncio.sleep(self.__switch_delay) # Slowdown diff --git a/kvmd/plugins/ugpio/wol.py b/kvmd/plugins/ugpio/wol.py index 842ebd47..1305b3e6 100644 --- a/kvmd/plugins/ugpio/wol.py +++ b/kvmd/plugins/ugpio/wol.py @@ -24,7 +24,9 @@ import socket import functools from typing import Dict +from typing import Callable from typing import Optional +from typing import Any from ...logging import get_logger @@ -66,11 +68,15 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute "mac": Option("", type=valid_mac, if_empty=""), } - async def read(self, pin: int) -> bool: + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return str + + async def read(self, pin: str) -> bool: _ = pin return False - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: _ = pin if not state: return |