summaryrefslogtreecommitdiff
path: root/kvmd/plugins
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2021-09-08 05:43:36 +0300
committerMaxim Devaev <[email protected]>2021-09-08 05:43:36 +0300
commit98ad1145a8782d3b82516dd9f81c86d9e80391c5 (patch)
treeb8f348f0815e8ee72236bf9879af779c95727755 /kvmd/plugins
parent939c63fe7daf2ddc9a05c9e0fbeab63cb5c6f0c1 (diff)
string pins
Diffstat (limited to 'kvmd/plugins')
-rw-r--r--kvmd/plugins/ugpio/__init__.py14
-rw-r--r--kvmd/plugins/ugpio/ezcoo.py25
-rw-r--r--kvmd/plugins/ugpio/gpio.py28
-rw-r--r--kvmd/plugins/ugpio/hidrelay.py43
-rw-r--r--kvmd/plugins/ugpio/ipmi.py36
-rw-r--r--kvmd/plugins/ugpio/otgbind.py11
-rw-r--r--kvmd/plugins/ugpio/pwm.py22
-rw-r--r--kvmd/plugins/ugpio/tesmart.py24
-rw-r--r--kvmd/plugins/ugpio/wol.py10
9 files changed, 127 insertions, 86 deletions
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py
index eae65891..f63df5e4 100644
--- a/kvmd/plugins/ugpio/__init__.py
+++ b/kvmd/plugins/ugpio/__init__.py
@@ -22,6 +22,7 @@
from typing import Set
from typing import Type
+from typing import Callable
from typing import Optional
from typing import Any
@@ -51,7 +52,6 @@ class GpioDriverOfflineError(GpioOperationError):
class UserGpioModes:
INPUT = "input"
OUTPUT = "output"
-
ALL = set([INPUT, OUTPUT])
@@ -74,11 +74,15 @@ class BaseUserGpioDriver(BasePlugin):
def get_modes(cls) -> Set[str]:
return set(UserGpioModes.ALL)
- def register_input(self, pin: int, debounce: float) -> None:
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ raise NotImplementedError
+
+ def register_input(self, pin: str, debounce: float) -> None:
_ = pin
_ = debounce
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
_ = pin
_ = initial
@@ -91,10 +95,10 @@ class BaseUserGpioDriver(BasePlugin):
async def cleanup(self) -> None:
pass
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
raise NotImplementedError
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
raise NotImplementedError
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py
index 81e199e8..a20aed32 100644
--- a/kvmd/plugins/ugpio/ezcoo.py
+++ b/kvmd/plugins/ugpio/ezcoo.py
@@ -28,7 +28,9 @@ import time
from typing import Tuple
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
import serial
@@ -85,6 +87,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
}
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_number(arg, min=0, max=3, name="Ezcoo channel")))
+
def prepare(self) -> None:
assert self.__proc is None
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
@@ -105,16 +111,16 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
if self.__proc.is_alive() or self.__proc.exitcode is not None:
self.__proc.join()
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- return (self.__channel == pin)
+ return (self.__channel == int(pin))
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- if state and (0 <= pin <= 3):
- self.__ctl_queue.put_nowait(pin)
+ if state:
+ self.__ctl_queue.put_nowait(int(pin))
# =====
@@ -174,9 +180,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
return (channel, data)
def __send_channel(self, tty: serial.Serial, channel: int) -> None:
- # Twice because of ezcoo bugs
- cmd = (b"SET" if self.__protocol == 1 else b"EZS")
- tty.write((b"%s OUT1 VS IN%d\n" % (cmd, channel + 1)) * 2)
+ assert 0 <= channel <= 3
+ cmd = b"%s OUT1 VS IN%d\n" % (
+ (b"SET" if self.__protocol == 1 else b"EZS"),
+ channel + 1,
+ )
+ tty.write(cmd * 2) # Twice because of ezcoo bugs
tty.flush()
def __str__(self) -> str:
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 28f0d5cb..315de6f1 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -21,7 +21,9 @@
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
import gpiod
@@ -31,6 +33,7 @@ from ... import aiogp
from ...yamlconf import Option
from ...validators.os import valid_abs_path
+from ...validators.hw import valid_gpio_pin
from . import BaseUserGpioDriver
@@ -63,11 +66,15 @@ class Plugin(BaseUserGpioDriver):
"device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="device_path"),
}
- def register_input(self, pin: int, debounce: float) -> None:
- self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_gpio_pin(arg)))
+
+ def register_input(self, pin: str, debounce: float) -> None:
+ self.__input_pins[int(pin)] = aiogp.AioReaderPinParams(False, debounce)
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- self.__output_pins[pin] = initial
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
+ self.__output_pins[int(pin)] = initial
def prepare(self) -> None:
assert self.__reader is None
@@ -95,14 +102,15 @@ class Plugin(BaseUserGpioDriver):
except Exception:
pass
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
assert self.__reader
- if pin in self.__input_pins:
- return self.__reader.get(pin)
- return bool(self.__output_lines[pin].get_value())
+ pin_int = int(pin)
+ if pin_int in self.__input_pins:
+ return self.__reader.get(pin_int)
+ return bool(self.__output_lines[pin_int].get_value())
- async def write(self, pin: int, state: bool) -> None:
- self.__output_lines[pin].set_value(int(state))
+ async def write(self, pin: str, state: bool) -> None:
+ self.__output_lines[int(pin)].set_value(int(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"
diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py
index 7e18c72a..d4bdda48 100644
--- a/kvmd/plugins/ugpio/hidrelay.py
+++ b/kvmd/plugins/ugpio/hidrelay.py
@@ -25,7 +25,9 @@ import contextlib
from typing import Dict
from typing import Set
+from typing import Callable
from typing import Optional
+from typing import Any
import hid
@@ -36,6 +38,7 @@ from ... import aiotools
from ...yamlconf import Option
+from ...validators.basic import valid_number
from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
@@ -79,11 +82,12 @@ class Plugin(BaseUserGpioDriver):
def get_modes(cls) -> Set[str]:
return set([UserGpioModes.OUTPUT])
- def register_input(self, pin: int, debounce: float) -> None:
- raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_number(arg, min=0, max=7, name="HID relay channel")))
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- self.__initials[pin] = initial
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
+ self.__initials[int(pin)] = initial
def prepare(self) -> None:
logger = get_logger(0)
@@ -113,15 +117,15 @@ class Plugin(BaseUserGpioDriver):
self.__close_device()
self.__stop = True
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
try:
- return self.__inner_read(pin)
+ return self.__inner_read(int(pin))
except Exception:
raise GpioDriverOfflineError(self)
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
try:
- return self.__inner_write(pin, state)
+ return self.__inner_write(int(pin), state)
except Exception:
raise GpioDriverOfflineError(self)
@@ -140,27 +144,20 @@ class Plugin(BaseUserGpioDriver):
pin, self, self.__device_path, tools.efmt(err))
def __inner_read(self, pin: int) -> bool:
- if self.__check_pin(pin):
- return bool(self.__inner_read_raw() & (1 << pin))
- return False
+ assert 0 <= pin <= 7
+ return bool(self.__inner_read_raw() & (1 << pin))
def __inner_read_raw(self) -> int:
with self.__ensure_device("reading") as device:
return device.get_feature_report(1, 8)[7]
def __inner_write(self, pin: int, state: bool) -> None:
- if self.__check_pin(pin):
- with self.__ensure_device("writing") as device:
- report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
- result = device.send_feature_report(report)
- if result < 0:
- raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
-
- def __check_pin(self, pin: int) -> bool:
- ok = (0 <= pin <= 7)
- if not ok:
- get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path)
- return ok
+ assert 0 <= pin <= 7
+ with self.__ensure_device("writing") as device:
+ report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
+ result = device.send_feature_report(report)
+ if result < 0:
+ raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
@contextlib.contextmanager
def __ensure_device(self, context: str) -> hid.device:
diff --git a/kvmd/plugins/ugpio/ipmi.py b/kvmd/plugins/ugpio/ipmi.py
index 26ea9dbb..718c71cb 100644
--- a/kvmd/plugins/ugpio/ipmi.py
+++ b/kvmd/plugins/ugpio/ipmi.py
@@ -25,7 +25,9 @@ import functools
from typing import List
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
from ...logging import get_logger
@@ -35,6 +37,7 @@ from ... import aioproc
from ...yamlconf import Option
+from ...validators import check_string_in_list
from ...validators.basic import valid_float_f01
from ...validators.net import valid_ip_or_host
from ...validators.net import valid_port
@@ -46,12 +49,12 @@ from . import BaseUserGpioDriver
# =====
_OUTPUTS = {
- 1: "on",
- 2: "off",
- 3: "cycle",
- 4: "reset",
- 5: "diag",
- 6: "soft",
+ "1": "on",
+ "2": "off",
+ "3": "cycle",
+ "4": "reset",
+ "5": "diag",
+ "6": "soft",
}
@@ -108,14 +111,19 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
"state_poll": Option(1.0, type=valid_float_f01),
}
- def register_input(self, pin: int, debounce: float) -> None:
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ actions = ["0", *_OUTPUTS, "status", *_OUTPUTS.values()]
+ return (lambda arg: check_string_in_list(arg, "IPMI action", actions))
+
+ def register_input(self, pin: str, debounce: float) -> None:
_ = debounce
- if pin != 0:
+ if pin not in ["0", "status"]:
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
_ = initial
- if pin not in _OUTPUTS:
+ if pin not in [*_OUTPUTS, *_OUTPUTS.values()]:
raise RuntimeError(f"Unsupported mode 'output' for pin={pin} on {self}")
def prepare(self) -> None:
@@ -131,17 +139,17 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
prev = new
await asyncio.sleep(self.__state_poll)
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
if not self.__online:
raise GpioDriverOfflineError(self)
- if pin == 0:
+ if pin == "0":
return self.__power
return False
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
if not self.__online:
raise GpioDriverOfflineError(self)
- action = _OUTPUTS[pin]
+ action = (_OUTPUTS[pin] if pin.isdigit() else pin)
try:
proc = await aioproc.log_process(**self.__make_ipmitool_kwargs(action), logger=get_logger(0))
if proc.returncode != 0:
diff --git a/kvmd/plugins/ugpio/otgbind.py b/kvmd/plugins/ugpio/otgbind.py
index 6dec46c9..600c8488 100644
--- a/kvmd/plugins/ugpio/otgbind.py
+++ b/kvmd/plugins/ugpio/otgbind.py
@@ -23,6 +23,9 @@
import os
import asyncio
+from typing import Callable
+from typing import Any
+
from ...logging import get_logger
from ...inotify import InotifyMask
@@ -50,6 +53,10 @@ class Plugin(BaseUserGpioDriver):
self.__udc = udc
self.__driver = ""
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return str
+
def prepare(self) -> None:
(self.__udc, self.__driver) = usb.find_udc(self.__udc)
get_logger().info("Using UDC %s", self.__udc)
@@ -83,11 +90,11 @@ class Plugin(BaseUserGpioDriver):
except Exception:
logger.exception("Unexpected OTG-bind watcher error")
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
_ = pin
return os.path.islink(self.__get_driver_path(self.__udc))
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
_ = pin
with open(self.__get_driver_path("bind" if state else "unbind"), "w") as ctl_file:
ctl_file.write(f"{self.__udc}\n")
diff --git a/kvmd/plugins/ugpio/pwm.py b/kvmd/plugins/ugpio/pwm.py
index 582bbb50..2bd8807e 100644
--- a/kvmd/plugins/ugpio/pwm.py
+++ b/kvmd/plugins/ugpio/pwm.py
@@ -22,8 +22,10 @@
from typing import Dict
-from typing import Optional
from typing import Set
+from typing import Callable
+from typing import Optional
+from typing import Any
from periphery import PWM
@@ -35,6 +37,7 @@ from ... import aiotools
from ...yamlconf import Option
from ...validators.basic import valid_int_f0
+from ...validators.hw import valid_gpio_pin
from . import GpioDriverOfflineError
from . import UserGpioModes
@@ -77,11 +80,12 @@ class Plugin(BaseUserGpioDriver):
def get_modes(cls) -> Set[str]:
return set([UserGpioModes.OUTPUT])
- def register_input(self, pin: int, debounce: float) -> None:
- raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_gpio_pin(arg)))
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- self.__channels[pin] = initial
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
+ self.__channels[int(pin)] = initial
def prepare(self) -> None:
logger = get_logger(0)
@@ -106,15 +110,15 @@ class Plugin(BaseUserGpioDriver):
get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s",
self.__chip, pin, tools.efmt(err))
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
try:
- return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push)
+ return (self.__pwms[int(pin)].duty_cycle_ns == self.__duty_cycle_push)
except Exception:
raise GpioDriverOfflineError(self)
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
try:
- self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state)
+ self.__pwms[int(pin)].duty_cycle_ns = self.__get_duty_cycle(state)
except Exception:
raise GpioDriverOfflineError(self)
diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py
index 7fbd37db..27cf0038 100644
--- a/kvmd/plugins/ugpio/tesmart.py
+++ b/kvmd/plugins/ugpio/tesmart.py
@@ -24,7 +24,9 @@ import asyncio
from typing import Tuple
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
from ...logging import get_logger
@@ -33,6 +35,7 @@ from ... import aiotools
from ...yamlconf import Option
+from ...validators.basic import valid_number
from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01
from ...validators.net import valid_ip_or_host
@@ -79,15 +82,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
"state_poll": Option(10.0, type=valid_float_f01),
}
- def register_input(self, pin: int, debounce: float) -> None:
- if not (0 <= pin < 16):
- raise RuntimeError(f"Unsupported port number: {pin}")
- _ = debounce
-
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- if not (0 <= pin < 16):
- raise RuntimeError(f"Unsupported port number: {pin}")
- _ = initial
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_number(arg, min=0, max=15, name="Tesmart channel")))
async def run(self) -> None:
prev_active = -2
@@ -104,12 +101,13 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
async def cleanup(self) -> None:
await self.__close_device()
- async def read(self, pin: int) -> bool:
- return (self.__active == pin)
+ async def read(self, pin: str) -> bool:
+ return (self.__active == int(pin))
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
+ assert 0 <= pin <= 15
if state:
- await self.__send_command("{:c}{:c}".format(1, pin + 1).encode())
+ await self.__send_command("{:c}{:c}".format(1, int(pin) + 1).encode())
await self.__update_notifier.notify()
await asyncio.sleep(self.__switch_delay) # Slowdown
diff --git a/kvmd/plugins/ugpio/wol.py b/kvmd/plugins/ugpio/wol.py
index 842ebd47..1305b3e6 100644
--- a/kvmd/plugins/ugpio/wol.py
+++ b/kvmd/plugins/ugpio/wol.py
@@ -24,7 +24,9 @@ import socket
import functools
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
from ...logging import get_logger
@@ -66,11 +68,15 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
"mac": Option("", type=valid_mac, if_empty=""),
}
- async def read(self, pin: int) -> bool:
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return str
+
+ async def read(self, pin: str) -> bool:
_ = pin
return False
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
_ = pin
if not state:
return