diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/__init__.py | 16 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 12 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/export.py | 9 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/ugpio.py | 3 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 13 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 132 | ||||
-rw-r--r-- | kvmd/gpio.py | 12 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 6 | ||||
-rw-r--r-- | kvmd/plugins/hid/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 6 | ||||
-rw-r--r-- | kvmd/plugins/msd/relay.py | 4 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 77 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 86 |
15 files changed, 313 insertions, 73 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 7202cfbc..ce7cfaef 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -40,6 +40,7 @@ from ..plugins.auth import get_auth_service_class from ..plugins.hid import get_hid_class from ..plugins.atx import get_atx_class from ..plugins.msd import get_msd_class +from ..plugins.ugpio import get_ugpio_driver_class from ..yamlconf import ConfigError from ..yamlconf import make_config @@ -174,6 +175,16 @@ def _patch_dynamic( # pylint: disable=too-many-locals rebuild = True if load_gpio: + for (driver, params) in { # type: ignore + "gpio": {}, + **(raw_config.get("kvmd", {}).get("gpio", {}).get("drivers", {})), + }.items(): + driver_type = valid_stripped_string_not_empty(params.get("type", "gpio")) + scheme["kvmd"]["gpio"]["drivers"][driver] = { + "type": Option(driver_type, type=valid_stripped_string_not_empty), + **get_ugpio_driver_class(driver_type).get_plugin_options() + } + for (channel, params) in raw_config.get("kvmd", {}).get("gpio", {}).get("scheme", {}).items(): try: mode = valid_ugpio_mode(params.get("mode", "")) @@ -181,6 +192,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals pass finally: ch_scheme: Dict = { + "driver": Option("gpio"), "pin": Option(-1, type=valid_gpio_pin), "mode": Option("", type=valid_ugpio_mode), "inverted": Option(False, type=valid_bool), @@ -196,7 +208,8 @@ def _patch_dynamic( # pylint: disable=too-many-locals "max_delay": Option(0.1, type=valid_float_f01), }, }) - scheme["kvmd"]["gpio"]["scheme"][channel] = ch_scheme + scheme["kvmd"]["gpio"]["scheme"][channel] = ch_scheme + rebuild = True return rebuild @@ -326,6 +339,7 @@ def _get_config_scheme() -> Dict: "gpio": { "state_poll": Option(0.1, type=valid_float_f01), + "drivers": {}, # Dynamic content "scheme": {}, # Dymanic content "view": { "header": { diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index ca1e8faf..bb9b3b71 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -61,16 +61,16 @@ def _clear_gpio(config: Section) -> None: ("streamer/cap", config.streamer.cap_pin), ("streamer/conv", config.streamer.conv_pin), - *([ - (f"gpio/{channel}", params.pin) - for (channel, params) in config.gpio.scheme.items() - if params.mode == "output" - ]), + # *([ + # (f"gpio/{channel}", params.pin) + # for (channel, params) in config.gpio.scheme.items() + # if params.mode == "output" + # ]), ]: if pin >= 0: logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name) try: - gpio.set_output(pin, initial=False) + gpio.set_output(pin, False) except Exception: logger.exception("Can't clear GPIO pin=%d (%s)", pin, name) diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py index 6dae0ce1..2e02c929 100644 --- a/kvmd/apps/kvmd/api/export.py +++ b/kvmd/apps/kvmd/api/export.py @@ -54,13 +54,18 @@ class ExportApi: self.__user_gpio.get_state(), ]) rows: List[str] = [] + self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled") self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power") + for mode in ["input", "output"]: - for (channel, gch) in gpio_state[f"{mode}s"].items(): - self.__append_prometheus_rows(rows, gch["state"], f"pikvm_gpio_input_{channel}") + for (channel, ch_state) in gpio_state[f"{mode}s"].items(): + for key in ["online", "state"]: + self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}") + if hw_state is not None: self.__append_prometheus_rows(rows, hw_state["health"], "pikvm_hw") + return Response(text="\n".join(rows)) def __append_prometheus_rows(self, rows: List[str], value: Any, path: str) -> None: diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py index 66f7b358..ddd28477 100644 --- a/kvmd/apps/kvmd/api/ugpio.py +++ b/kvmd/apps/kvmd/api/ugpio.py @@ -59,5 +59,6 @@ class UserGpioApi: async def __pulse_handler(self, request: Request) -> Response: channel = valid_ugpio_channel(request.query.get("channel")) delay = valid_float_f0(request.query.get("delay", "0")) - await self.__user_gpio.pulse(channel, delay) + wait = valid_bool(request.query.get("wait", "0")) + await self.__user_gpio.pulse(channel, delay, wait) return make_json_response() diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 65c10d47..cb999fb9 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -106,20 +106,21 @@ class StreamerResolutionNotSupported(OperationError): # ===== @dataclasses.dataclass(frozen=True) -class _Component: +class _Component: # pylint: disable=too-many-instance-attributes name: str event_type: str obj: object + sysprep: Optional[Callable[[], None]] = None + systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None - systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None def __post_init__(self) -> None: if isinstance(self.obj, BasePlugin): object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})") - for field in ["get_state", "poll_state", "systask", "cleanup"]: + for field in ["sysprep", "systask", "get_state", "poll_state", "cleanup"]: object.__setattr__(self, field, getattr(self.obj, field, None)) if self.get_state or self.poll_state: assert self.event_type, self @@ -278,7 +279,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins # ===== SYSTEM STUFF def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ - self.__hid.start() + for component in self.__components: + if component.sysprep: + component.sysprep() aioproc.rename_process("main") super().run(**kwargs) @@ -307,7 +310,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def wrapper() -> None: try: await method(*args) - raise RuntimeError(f"Dead system task: {method.__name__}" + raise RuntimeError(f"Dead system task: {method}" f"({', '.join(getattr(arg, '__name__', str(arg)) for arg in args)})") except asyncio.CancelledError: pass diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 14485c3c..6b2d597a 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -141,8 +141,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes **params_kwargs: Any, ) -> None: - self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin >= 0 else -1) - self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin >= 0 else -1) + self.__cap_pin = (gpio.set_output(cap_pin, False) if cap_pin >= 0 else -1) + self.__conv_pin = (gpio.set_output(conv_pin, False) if conv_pin >= 0 else -1) self.__sync_delay = sync_delay self.__init_delay = init_delay diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index c860ec16..c8b4ddd9 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -30,62 +30,89 @@ from typing import Optional from ...logging import get_logger +from ...plugins.ugpio import GpioError +from ...plugins.ugpio import GpioOperationError +from ...plugins.ugpio import GpioDriverOfflineError +from ...plugins.ugpio import BaseUserGpioDriver +from ...plugins.ugpio import get_ugpio_driver_class + from ... import aiotools -from ... import gpio from ...yamlconf import Section -from ...errors import OperationError from ...errors import IsBusyError # ===== -class GpioChannelNotFoundError(OperationError): +class GpioChannelNotFoundError(GpioOperationError): def __init__(self) -> None: super().__init__("GPIO channel is not found") -class GpioSwitchNotSupported(OperationError): +class GpioSwitchNotSupported(GpioOperationError): def __init__(self) -> None: super().__init__("This GPIO channel does not support switching") -class GpioPulseNotSupported(OperationError): +class GpioPulseNotSupported(GpioOperationError): def __init__(self) -> None: super().__init__("This GPIO channel does not support pulsing") -class GpioChannelIsBusyError(IsBusyError): +class GpioChannelIsBusyError(IsBusyError, GpioError): def __init__(self) -> None: super().__init__("Performing another GPIO operation on this channel, please try again later") # ===== class _GpioInput: - def __init__(self, channel: str, config: Section, reader: gpio.BatchReader) -> None: + def __init__( + self, + channel: str, + config: Section, + driver: BaseUserGpioDriver, + ) -> None: + self.__channel = channel self.__pin: int = config.pin self.__inverted: bool = config.inverted - self.__reader = reader + self.__driver = driver + self.__driver.register_input(self.__pin) def get_scheme(self) -> Dict: return {} def get_state(self) -> Dict: - return {"state": (self.__reader.get(self.__pin) ^ self.__inverted)} + (online, state) = (True, False) + try: + state = (self.__driver.read(self.__pin) ^ self.__inverted) + except GpioDriverOfflineError: + online = False + return { + "online": online, + "state": state, + } def __str__(self) -> str: - return f"Input({self.__channel}, pin={self.__pin})" + return f"Input({self.__channel}, driver={self.__driver.get_instance_name()}, pin={self.__pin})" __repr__ = __str__ class _GpioOutput: # pylint: disable=too-many-instance-attributes - def __init__(self, channel: str, config: Section, notifier: aiotools.AioNotifier) -> None: + def __init__( + self, + channel: str, + config: Section, + driver: BaseUserGpioDriver, + notifier: aiotools.AioNotifier, + ) -> None: + self.__channel = channel self.__pin: int = config.pin self.__inverted: bool = config.inverted + self.__initial: bool = config.initial self.__switch: bool = config.switch @@ -95,6 +122,9 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes self.__busy_delay: float = config.busy_delay + self.__driver = driver + self.__driver.register_output(self.__pin, (config.initial ^ config.inverted)) + self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier) def get_scheme(self) -> Dict: @@ -105,20 +135,31 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes "min_delay": (self.__min_pulse_delay if self.__pulse_delay else 0), "max_delay": (self.__max_pulse_delay if self.__pulse_delay else 0), }, + "hw": { + "driver": self.__driver.get_instance_name(), + "pin": self.__pin, + }, } def get_state(self) -> Dict: busy = self.__region.is_busy() + (online, state) = (True, False) + if not busy: + try: + state = self.__read() + except GpioDriverOfflineError: + online = False return { - "state": (self.__read() if not busy else False), + "online": online, + "state": state, "busy": busy, } def cleanup(self) -> None: try: - gpio.write(self.__pin, False) + self.__driver.write(self.__pin, (self.__initial ^ self.__inverted)) except Exception: - get_logger().exception("Can't cleanup GPIO %s", self) + get_logger().exception("Can't cleanup %s", self) async def switch(self, state: bool) -> bool: if not self.__switch: @@ -126,21 +167,25 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes async with self.__region: if state != self.__read(): self.__write(state) - get_logger(0).info("Switched %s to %d", self, state) + get_logger(0).info("Switched %s to state=%d", self, state) await asyncio.sleep(self.__busy_delay) return True await asyncio.sleep(self.__busy_delay) return False @aiotools.atomic - async def pulse(self, delay: float) -> None: + async def pulse(self, delay: float, wait: bool) -> None: if not self.__pulse_delay: raise GpioPulseNotSupported() delay = min(max((delay or self.__pulse_delay), self.__min_pulse_delay), self.__max_pulse_delay) - await aiotools.run_region_task( - f"Can't perform pulse of {self} or operation was not completed", - self.__region, self.__inner_pulse, delay, - ) + if wait: + async with self.__region: + await self.__inner_pulse(delay) + else: + await aiotools.run_region_task( + f"Can't perform pulse of {self} or operation was not completed", + self.__region, self.__inner_pulse, delay, + ) @aiotools.atomic async def __inner_pulse(self, delay: float) -> None: @@ -153,13 +198,13 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes get_logger(0).info("Pulsed %s with delay=%.2f", self, delay) def __read(self) -> bool: - return (gpio.read(self.__pin) ^ self.__inverted) + return (self.__driver.read(self.__pin) ^ self.__inverted) def __write(self, state: bool) -> None: - gpio.write(self.__pin, (state ^ self.__inverted)) + self.__driver.write(self.__pin, (state ^ self.__inverted)) def __str__(self) -> str: - return f"Output({self.__channel}, pin={self.__pin})" + return f"Output({self.__channel}, driver={self.__driver.get_instance_name()}, pin={self.__pin})" __repr__ = __str__ @@ -170,27 +215,23 @@ class UserGpio: self.__view = config.view self.__state_notifier = aiotools.AioNotifier() - self.__reader = gpio.BatchReader( - pins=[ - ( - gpio.set_input(ch_config.pin) - if ch_config.mode == "input" else - gpio.set_output(ch_config.pin, (ch_config.initial ^ ch_config.inverted)) - ) - for ch_config in config.scheme.values() - ], - interval=config.state_poll, - notifier=self.__state_notifier, - ) + + self.__drivers = { + driver: get_ugpio_driver_class(drv_config.type)(**drv_config._unpack(ignore=["type"])) + for (driver, drv_config) in config.drivers.items() + } self.__inputs: Dict[str, _GpioInput] = {} self.__outputs: Dict[str, _GpioOutput] = {} for (channel, ch_config) in sorted(config.scheme.items(), key=operator.itemgetter(0)): + driver = self.__drivers.get(ch_config.driver) + if driver is None: + raise RuntimeError(f"Missing User-GPIO driver configuration: {ch_config.driver}") if ch_config.mode == "input": - self.__inputs[channel] = _GpioInput(channel, ch_config, self.__reader) + self.__inputs[channel] = _GpioInput(channel, ch_config, driver) else: # output: - self.__outputs[channel] = _GpioOutput(channel, ch_config, self.__state_notifier) + self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__state_notifier) async def get_model(self) -> Dict: return { @@ -216,13 +257,26 @@ class UserGpio: prev_state = state await self.__state_notifier.wait() + def sysprep(self) -> None: + get_logger().info("Preparing User-GPIO drivers ...") + for (_, driver) in sorted(self.__drivers.items(), key=operator.itemgetter(0)): + driver.prepare(self.__state_notifier) + async def systask(self) -> None: - get_logger(0).info("Polling User-GPIO inputs ...") - await self.__reader.poll() + get_logger(0).info("Running User-GPIO drivers ...") + await asyncio.gather(*[ + driver.run() + for (_, driver) in sorted(self.__drivers.items(), key=operator.itemgetter(0)) + ]) async def cleanup(self) -> None: for gout in self.__outputs.values(): gout.cleanup() + for driver in self.__drivers.values(): + try: + driver.cleanup() + except Exception: + get_logger().exception("Can't cleanup driver %r", driver.get_instance_name()) async def switch(self, channel: str, state: bool) -> bool: gout = self.__outputs.get(channel) diff --git a/kvmd/gpio.py b/kvmd/gpio.py index ea99149e..f3ef8ba2 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -24,7 +24,7 @@ import asyncio import contextlib from typing import Tuple -from typing import List +from typing import Set from typing import Generator from typing import Optional @@ -48,7 +48,7 @@ def bcm() -> Generator[None, None, None]: logger.info("GPIO cleaned") -def set_output(pin: int, initial: bool=False) -> int: +def set_output(pin: int, initial: Optional[bool]) -> int: assert pin >= 0, pin GPIO.setup(pin, GPIO.OUT, initial=initial) return pin @@ -71,10 +71,10 @@ def write(pin: int, state: bool) -> None: class BatchReader: - def __init__(self, pins: List[int], interval: float, notifier: aiotools.AioNotifier) -> None: - self.__pins = pins - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(pins) - self.__state = {pin: read(pin) for pin in pins} + def __init__(self, pins: Set[int], interval: float, notifier: aiotools.AioNotifier) -> None: + self.__pins = sorted(pins) + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) + self.__state = {pin: read(pin) for pin in self.__pins} self.__interval = interval self.__notifier = notifier diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index 674f18d1..aed086ae 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -62,8 +62,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__power_led_pin = gpio.set_input(power_led_pin) self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin) + self.__power_switch_pin = gpio.set_output(power_switch_pin, False) + self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) self.__power_led_inverted = power_led_inverted self.__hdd_led_inverted = hdd_led_inverted @@ -75,7 +75,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__state_notifier) self.__reader = gpio.BatchReader( - pins=[self.__power_led_pin, self.__hdd_led_pin], + pins=set([self.__power_led_pin, self.__hdd_led_pin]), interval=state_poll, notifier=self.__state_notifier, ) diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py index 8f4cb02c..e81ea168 100644 --- a/kvmd/plugins/hid/__init__.py +++ b/kvmd/plugins/hid/__init__.py @@ -32,8 +32,8 @@ from .. import get_plugin_class # ===== class BaseHid(BasePlugin): - def start(self) -> None: - pass + def sysprep(self) -> None: + raise NotImplementedError async def get_state(self) -> Dict: raise NotImplementedError diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py index 95566d34..575190e5 100644 --- a/kvmd/plugins/hid/otg/__init__.py +++ b/kvmd/plugins/hid/otg/__init__.py @@ -74,7 +74,7 @@ class Plugin(BaseHid): "noop": Option(False, type=valid_bool), } - def start(self) -> None: + def sysprep(self) -> None: self.__keyboard_proc.start() self.__mouse_proc.start() diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index cee0048d..5abab1f5 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -175,7 +175,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst multiprocessing.Process.__init__(self, daemon=True) - self.__reset_pin = gpio.set_output(reset_pin) + self.__reset_pin = gpio.set_output(reset_pin, False) self.__reset_delay = reset_delay self.__device_path = device_path @@ -217,9 +217,9 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst "noop": Option(False, type=valid_bool), } - def start(self) -> None: + def sysprep(self) -> None: get_logger(0).info("Starting HID daemon ...") - multiprocessing.Process.start(self) + self.start() async def get_state(self) -> Dict: state = await self.__state_flags.get() diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index 1254351a..96cb3c63 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -165,8 +165,8 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes reset_delay: float, ) -> None: - self.__target_pin = gpio.set_output(target_pin) - self.__reset_pin = gpio.set_output(reset_pin) + self.__target_pin = gpio.set_output(target_pin, False) + self.__reset_pin = gpio.set_output(reset_pin, False) self.__device_path = device_path self.__init_delay = init_delay diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py new file mode 100644 index 00000000..9dbc0166 --- /dev/null +++ b/kvmd/plugins/ugpio/__init__.py @@ -0,0 +1,77 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Type +from typing import Optional + +from ...errors import OperationError + +from ... import aiotools + +from .. import BasePlugin +from .. import get_plugin_class + + +# ===== +class GpioError(Exception): + pass + + +class GpioOperationError(OperationError, GpioError): + pass + + +class GpioDriverOfflineError(GpioOperationError): + def __init__(self, driver: "BaseUserGpioDriver") -> None: + super().__init__(f"GPIO driver {driver.get_instance_name()!r} is offline") + + +# ===== +class BaseUserGpioDriver(BasePlugin): + def get_instance_name(self) -> str: + raise NotImplementedError + + def register_input(self, pin: int) -> None: + raise NotImplementedError + + def register_output(self, pin: int, initial: Optional[bool]) -> None: + raise NotImplementedError + + def prepare(self, notifier: aiotools.AioNotifier) -> None: + raise NotImplementedError + + async def run(self) -> None: + raise NotImplementedError + + def cleanup(self) -> None: + pass + + def read(self, pin: int) -> bool: + raise NotImplementedError + + def write(self, pin: int, state: bool) -> None: + raise NotImplementedError + + +# ===== +def get_ugpio_driver_class(name: str) -> Type[BaseUserGpioDriver]: + return get_plugin_class("ugpio", name) # type: ignore diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py new file mode 100644 index 00000000..04afc75d --- /dev/null +++ b/kvmd/plugins/ugpio/gpio.py @@ -0,0 +1,86 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Dict +from typing import Set +from typing import Optional + +from ... import aiotools +from ... import gpio + +from ...yamlconf import Option + +from ...validators.basic import valid_float_f01 + +from . import BaseUserGpioDriver + + +# ===== +class Plugin(BaseUserGpioDriver): + def __init__(self, state_poll: float) -> None: # pylint: disable=super-init-not-called + self.__state_poll = state_poll + + self.__input_pins: Set[int] = set() + self.__output_pins: Dict[int, Optional[bool]] = {} + + self.__reader: Optional[gpio.BatchReader] = None + + @classmethod + def get_plugin_options(cls) -> Dict: + return { + "state_poll": Option(0.1, type=valid_float_f01), + } + + def get_instance_name(self) -> str: + return "gpio" + + def register_input(self, pin: int) -> None: + self.__input_pins.add(pin) + + def register_output(self, pin: int, initial: Optional[bool]) -> None: + self.__output_pins[pin] = initial + + def prepare(self, notifier: aiotools.AioNotifier) -> None: + assert self.__reader is None + self.__reader = gpio.BatchReader( + pins=set([ + *map(gpio.set_input, self.__input_pins), + *[ + gpio.set_output(pin, initial) + for (pin, initial) in self.__output_pins.items() + ], + ]), + interval=self.__state_poll, + notifier=notifier, + ) + + async def run(self) -> None: + assert self.__reader + await self.__reader.poll() + + def read(self, pin: int) -> bool: + assert self.__reader + return self.__reader.get(pin) + + def write(self, pin: int, state: bool) -> None: + assert self.__reader + gpio.write(pin, state) |