summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/__init__.py16
-rw-r--r--kvmd/apps/cleanup/__init__.py12
-rw-r--r--kvmd/apps/kvmd/api/export.py9
-rw-r--r--kvmd/apps/kvmd/api/ugpio.py3
-rw-r--r--kvmd/apps/kvmd/server.py13
-rw-r--r--kvmd/apps/kvmd/streamer.py4
-rw-r--r--kvmd/apps/kvmd/ugpio.py132
-rw-r--r--kvmd/gpio.py12
-rw-r--r--kvmd/plugins/atx/gpio.py6
-rw-r--r--kvmd/plugins/hid/__init__.py4
-rw-r--r--kvmd/plugins/hid/otg/__init__.py2
-rw-r--r--kvmd/plugins/hid/serial.py6
-rw-r--r--kvmd/plugins/msd/relay.py4
-rw-r--r--kvmd/plugins/ugpio/__init__.py77
-rw-r--r--kvmd/plugins/ugpio/gpio.py86
15 files changed, 313 insertions, 73 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index 7202cfbc..ce7cfaef 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -40,6 +40,7 @@ from ..plugins.auth import get_auth_service_class
from ..plugins.hid import get_hid_class
from ..plugins.atx import get_atx_class
from ..plugins.msd import get_msd_class
+from ..plugins.ugpio import get_ugpio_driver_class
from ..yamlconf import ConfigError
from ..yamlconf import make_config
@@ -174,6 +175,16 @@ def _patch_dynamic( # pylint: disable=too-many-locals
rebuild = True
if load_gpio:
+ for (driver, params) in { # type: ignore
+ "gpio": {},
+ **(raw_config.get("kvmd", {}).get("gpio", {}).get("drivers", {})),
+ }.items():
+ driver_type = valid_stripped_string_not_empty(params.get("type", "gpio"))
+ scheme["kvmd"]["gpio"]["drivers"][driver] = {
+ "type": Option(driver_type, type=valid_stripped_string_not_empty),
+ **get_ugpio_driver_class(driver_type).get_plugin_options()
+ }
+
for (channel, params) in raw_config.get("kvmd", {}).get("gpio", {}).get("scheme", {}).items():
try:
mode = valid_ugpio_mode(params.get("mode", ""))
@@ -181,6 +192,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
pass
finally:
ch_scheme: Dict = {
+ "driver": Option("gpio"),
"pin": Option(-1, type=valid_gpio_pin),
"mode": Option("", type=valid_ugpio_mode),
"inverted": Option(False, type=valid_bool),
@@ -196,7 +208,8 @@ def _patch_dynamic( # pylint: disable=too-many-locals
"max_delay": Option(0.1, type=valid_float_f01),
},
})
- scheme["kvmd"]["gpio"]["scheme"][channel] = ch_scheme
+ scheme["kvmd"]["gpio"]["scheme"][channel] = ch_scheme
+ rebuild = True
return rebuild
@@ -326,6 +339,7 @@ def _get_config_scheme() -> Dict:
"gpio": {
"state_poll": Option(0.1, type=valid_float_f01),
+ "drivers": {}, # Dynamic content
"scheme": {}, # Dymanic content
"view": {
"header": {
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
index ca1e8faf..bb9b3b71 100644
--- a/kvmd/apps/cleanup/__init__.py
+++ b/kvmd/apps/cleanup/__init__.py
@@ -61,16 +61,16 @@ def _clear_gpio(config: Section) -> None:
("streamer/cap", config.streamer.cap_pin),
("streamer/conv", config.streamer.conv_pin),
- *([
- (f"gpio/{channel}", params.pin)
- for (channel, params) in config.gpio.scheme.items()
- if params.mode == "output"
- ]),
+ # *([
+ # (f"gpio/{channel}", params.pin)
+ # for (channel, params) in config.gpio.scheme.items()
+ # if params.mode == "output"
+ # ]),
]:
if pin >= 0:
logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
try:
- gpio.set_output(pin, initial=False)
+ gpio.set_output(pin, False)
except Exception:
logger.exception("Can't clear GPIO pin=%d (%s)", pin, name)
diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py
index 6dae0ce1..2e02c929 100644
--- a/kvmd/apps/kvmd/api/export.py
+++ b/kvmd/apps/kvmd/api/export.py
@@ -54,13 +54,18 @@ class ExportApi:
self.__user_gpio.get_state(),
])
rows: List[str] = []
+
self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled")
self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power")
+
for mode in ["input", "output"]:
- for (channel, gch) in gpio_state[f"{mode}s"].items():
- self.__append_prometheus_rows(rows, gch["state"], f"pikvm_gpio_input_{channel}")
+ for (channel, ch_state) in gpio_state[f"{mode}s"].items():
+ for key in ["online", "state"]:
+ self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
+
if hw_state is not None:
self.__append_prometheus_rows(rows, hw_state["health"], "pikvm_hw")
+
return Response(text="\n".join(rows))
def __append_prometheus_rows(self, rows: List[str], value: Any, path: str) -> None:
diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py
index 66f7b358..ddd28477 100644
--- a/kvmd/apps/kvmd/api/ugpio.py
+++ b/kvmd/apps/kvmd/api/ugpio.py
@@ -59,5 +59,6 @@ class UserGpioApi:
async def __pulse_handler(self, request: Request) -> Response:
channel = valid_ugpio_channel(request.query.get("channel"))
delay = valid_float_f0(request.query.get("delay", "0"))
- await self.__user_gpio.pulse(channel, delay)
+ wait = valid_bool(request.query.get("wait", "0"))
+ await self.__user_gpio.pulse(channel, delay, wait)
return make_json_response()
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 65c10d47..cb999fb9 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -106,20 +106,21 @@ class StreamerResolutionNotSupported(OperationError):
# =====
@dataclasses.dataclass(frozen=True)
-class _Component:
+class _Component: # pylint: disable=too-many-instance-attributes
name: str
event_type: str
obj: object
+ sysprep: Optional[Callable[[], None]] = None
+ systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None
- systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
def __post_init__(self) -> None:
if isinstance(self.obj, BasePlugin):
object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})")
- for field in ["get_state", "poll_state", "systask", "cleanup"]:
+ for field in ["sysprep", "systask", "get_state", "poll_state", "cleanup"]:
object.__setattr__(self, field, getattr(self.obj, field, None))
if self.get_state or self.poll_state:
assert self.event_type, self
@@ -278,7 +279,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
# ===== SYSTEM STUFF
def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ
- self.__hid.start()
+ for component in self.__components:
+ if component.sysprep:
+ component.sysprep()
aioproc.rename_process("main")
super().run(**kwargs)
@@ -307,7 +310,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def wrapper() -> None:
try:
await method(*args)
- raise RuntimeError(f"Dead system task: {method.__name__}"
+ raise RuntimeError(f"Dead system task: {method}"
f"({', '.join(getattr(arg, '__name__', str(arg)) for arg in args)})")
except asyncio.CancelledError:
pass
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 14485c3c..6b2d597a 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -141,8 +141,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
**params_kwargs: Any,
) -> None:
- self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin >= 0 else -1)
- self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin >= 0 else -1)
+ self.__cap_pin = (gpio.set_output(cap_pin, False) if cap_pin >= 0 else -1)
+ self.__conv_pin = (gpio.set_output(conv_pin, False) if conv_pin >= 0 else -1)
self.__sync_delay = sync_delay
self.__init_delay = init_delay
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index c860ec16..c8b4ddd9 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -30,62 +30,89 @@ from typing import Optional
from ...logging import get_logger
+from ...plugins.ugpio import GpioError
+from ...plugins.ugpio import GpioOperationError
+from ...plugins.ugpio import GpioDriverOfflineError
+from ...plugins.ugpio import BaseUserGpioDriver
+from ...plugins.ugpio import get_ugpio_driver_class
+
from ... import aiotools
-from ... import gpio
from ...yamlconf import Section
-from ...errors import OperationError
from ...errors import IsBusyError
# =====
-class GpioChannelNotFoundError(OperationError):
+class GpioChannelNotFoundError(GpioOperationError):
def __init__(self) -> None:
super().__init__("GPIO channel is not found")
-class GpioSwitchNotSupported(OperationError):
+class GpioSwitchNotSupported(GpioOperationError):
def __init__(self) -> None:
super().__init__("This GPIO channel does not support switching")
-class GpioPulseNotSupported(OperationError):
+class GpioPulseNotSupported(GpioOperationError):
def __init__(self) -> None:
super().__init__("This GPIO channel does not support pulsing")
-class GpioChannelIsBusyError(IsBusyError):
+class GpioChannelIsBusyError(IsBusyError, GpioError):
def __init__(self) -> None:
super().__init__("Performing another GPIO operation on this channel, please try again later")
# =====
class _GpioInput:
- def __init__(self, channel: str, config: Section, reader: gpio.BatchReader) -> None:
+ def __init__(
+ self,
+ channel: str,
+ config: Section,
+ driver: BaseUserGpioDriver,
+ ) -> None:
+
self.__channel = channel
self.__pin: int = config.pin
self.__inverted: bool = config.inverted
- self.__reader = reader
+ self.__driver = driver
+ self.__driver.register_input(self.__pin)
def get_scheme(self) -> Dict:
return {}
def get_state(self) -> Dict:
- return {"state": (self.__reader.get(self.__pin) ^ self.__inverted)}
+ (online, state) = (True, False)
+ try:
+ state = (self.__driver.read(self.__pin) ^ self.__inverted)
+ except GpioDriverOfflineError:
+ online = False
+ return {
+ "online": online,
+ "state": state,
+ }
def __str__(self) -> str:
- return f"Input({self.__channel}, pin={self.__pin})"
+ return f"Input({self.__channel}, driver={self.__driver.get_instance_name()}, pin={self.__pin})"
__repr__ = __str__
class _GpioOutput: # pylint: disable=too-many-instance-attributes
- def __init__(self, channel: str, config: Section, notifier: aiotools.AioNotifier) -> None:
+ def __init__(
+ self,
+ channel: str,
+ config: Section,
+ driver: BaseUserGpioDriver,
+ notifier: aiotools.AioNotifier,
+ ) -> None:
+
self.__channel = channel
self.__pin: int = config.pin
self.__inverted: bool = config.inverted
+ self.__initial: bool = config.initial
self.__switch: bool = config.switch
@@ -95,6 +122,9 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
self.__busy_delay: float = config.busy_delay
+ self.__driver = driver
+ self.__driver.register_output(self.__pin, (config.initial ^ config.inverted))
+
self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier)
def get_scheme(self) -> Dict:
@@ -105,20 +135,31 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
"min_delay": (self.__min_pulse_delay if self.__pulse_delay else 0),
"max_delay": (self.__max_pulse_delay if self.__pulse_delay else 0),
},
+ "hw": {
+ "driver": self.__driver.get_instance_name(),
+ "pin": self.__pin,
+ },
}
def get_state(self) -> Dict:
busy = self.__region.is_busy()
+ (online, state) = (True, False)
+ if not busy:
+ try:
+ state = self.__read()
+ except GpioDriverOfflineError:
+ online = False
return {
- "state": (self.__read() if not busy else False),
+ "online": online,
+ "state": state,
"busy": busy,
}
def cleanup(self) -> None:
try:
- gpio.write(self.__pin, False)
+ self.__driver.write(self.__pin, (self.__initial ^ self.__inverted))
except Exception:
- get_logger().exception("Can't cleanup GPIO %s", self)
+ get_logger().exception("Can't cleanup %s", self)
async def switch(self, state: bool) -> bool:
if not self.__switch:
@@ -126,21 +167,25 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
async with self.__region:
if state != self.__read():
self.__write(state)
- get_logger(0).info("Switched %s to %d", self, state)
+ get_logger(0).info("Switched %s to state=%d", self, state)
await asyncio.sleep(self.__busy_delay)
return True
await asyncio.sleep(self.__busy_delay)
return False
@aiotools.atomic
- async def pulse(self, delay: float) -> None:
+ async def pulse(self, delay: float, wait: bool) -> None:
if not self.__pulse_delay:
raise GpioPulseNotSupported()
delay = min(max((delay or self.__pulse_delay), self.__min_pulse_delay), self.__max_pulse_delay)
- await aiotools.run_region_task(
- f"Can't perform pulse of {self} or operation was not completed",
- self.__region, self.__inner_pulse, delay,
- )
+ if wait:
+ async with self.__region:
+ await self.__inner_pulse(delay)
+ else:
+ await aiotools.run_region_task(
+ f"Can't perform pulse of {self} or operation was not completed",
+ self.__region, self.__inner_pulse, delay,
+ )
@aiotools.atomic
async def __inner_pulse(self, delay: float) -> None:
@@ -153,13 +198,13 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Pulsed %s with delay=%.2f", self, delay)
def __read(self) -> bool:
- return (gpio.read(self.__pin) ^ self.__inverted)
+ return (self.__driver.read(self.__pin) ^ self.__inverted)
def __write(self, state: bool) -> None:
- gpio.write(self.__pin, (state ^ self.__inverted))
+ self.__driver.write(self.__pin, (state ^ self.__inverted))
def __str__(self) -> str:
- return f"Output({self.__channel}, pin={self.__pin})"
+ return f"Output({self.__channel}, driver={self.__driver.get_instance_name()}, pin={self.__pin})"
__repr__ = __str__
@@ -170,27 +215,23 @@ class UserGpio:
self.__view = config.view
self.__state_notifier = aiotools.AioNotifier()
- self.__reader = gpio.BatchReader(
- pins=[
- (
- gpio.set_input(ch_config.pin)
- if ch_config.mode == "input" else
- gpio.set_output(ch_config.pin, (ch_config.initial ^ ch_config.inverted))
- )
- for ch_config in config.scheme.values()
- ],
- interval=config.state_poll,
- notifier=self.__state_notifier,
- )
+
+ self.__drivers = {
+ driver: get_ugpio_driver_class(drv_config.type)(**drv_config._unpack(ignore=["type"]))
+ for (driver, drv_config) in config.drivers.items()
+ }
self.__inputs: Dict[str, _GpioInput] = {}
self.__outputs: Dict[str, _GpioOutput] = {}
for (channel, ch_config) in sorted(config.scheme.items(), key=operator.itemgetter(0)):
+ driver = self.__drivers.get(ch_config.driver)
+ if driver is None:
+ raise RuntimeError(f"Missing User-GPIO driver configuration: {ch_config.driver}")
if ch_config.mode == "input":
- self.__inputs[channel] = _GpioInput(channel, ch_config, self.__reader)
+ self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
else: # output:
- self.__outputs[channel] = _GpioOutput(channel, ch_config, self.__state_notifier)
+ self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__state_notifier)
async def get_model(self) -> Dict:
return {
@@ -216,13 +257,26 @@ class UserGpio:
prev_state = state
await self.__state_notifier.wait()
+ def sysprep(self) -> None:
+ get_logger().info("Preparing User-GPIO drivers ...")
+ for (_, driver) in sorted(self.__drivers.items(), key=operator.itemgetter(0)):
+ driver.prepare(self.__state_notifier)
+
async def systask(self) -> None:
- get_logger(0).info("Polling User-GPIO inputs ...")
- await self.__reader.poll()
+ get_logger(0).info("Running User-GPIO drivers ...")
+ await asyncio.gather(*[
+ driver.run()
+ for (_, driver) in sorted(self.__drivers.items(), key=operator.itemgetter(0))
+ ])
async def cleanup(self) -> None:
for gout in self.__outputs.values():
gout.cleanup()
+ for driver in self.__drivers.values():
+ try:
+ driver.cleanup()
+ except Exception:
+ get_logger().exception("Can't cleanup driver %r", driver.get_instance_name())
async def switch(self, channel: str, state: bool) -> bool:
gout = self.__outputs.get(channel)
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
index ea99149e..f3ef8ba2 100644
--- a/kvmd/gpio.py
+++ b/kvmd/gpio.py
@@ -24,7 +24,7 @@ import asyncio
import contextlib
from typing import Tuple
-from typing import List
+from typing import Set
from typing import Generator
from typing import Optional
@@ -48,7 +48,7 @@ def bcm() -> Generator[None, None, None]:
logger.info("GPIO cleaned")
-def set_output(pin: int, initial: bool=False) -> int:
+def set_output(pin: int, initial: Optional[bool]) -> int:
assert pin >= 0, pin
GPIO.setup(pin, GPIO.OUT, initial=initial)
return pin
@@ -71,10 +71,10 @@ def write(pin: int, state: bool) -> None:
class BatchReader:
- def __init__(self, pins: List[int], interval: float, notifier: aiotools.AioNotifier) -> None:
- self.__pins = pins
- self.__flags: Tuple[Optional[bool], ...] = (None,) * len(pins)
- self.__state = {pin: read(pin) for pin in pins}
+ def __init__(self, pins: Set[int], interval: float, notifier: aiotools.AioNotifier) -> None:
+ self.__pins = sorted(pins)
+ self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
+ self.__state = {pin: read(pin) for pin in self.__pins}
self.__interval = interval
self.__notifier = notifier
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py
index 674f18d1..aed086ae 100644
--- a/kvmd/plugins/atx/gpio.py
+++ b/kvmd/plugins/atx/gpio.py
@@ -62,8 +62,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__power_led_pin = gpio.set_input(power_led_pin)
self.__hdd_led_pin = gpio.set_input(hdd_led_pin)
- self.__power_switch_pin = gpio.set_output(power_switch_pin)
- self.__reset_switch_pin = gpio.set_output(reset_switch_pin)
+ self.__power_switch_pin = gpio.set_output(power_switch_pin, False)
+ self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False)
self.__power_led_inverted = power_led_inverted
self.__hdd_led_inverted = hdd_led_inverted
@@ -75,7 +75,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__state_notifier)
self.__reader = gpio.BatchReader(
- pins=[self.__power_led_pin, self.__hdd_led_pin],
+ pins=set([self.__power_led_pin, self.__hdd_led_pin]),
interval=state_poll,
notifier=self.__state_notifier,
)
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 8f4cb02c..e81ea168 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -32,8 +32,8 @@ from .. import get_plugin_class
# =====
class BaseHid(BasePlugin):
- def start(self) -> None:
- pass
+ def sysprep(self) -> None:
+ raise NotImplementedError
async def get_state(self) -> Dict:
raise NotImplementedError
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py
index 95566d34..575190e5 100644
--- a/kvmd/plugins/hid/otg/__init__.py
+++ b/kvmd/plugins/hid/otg/__init__.py
@@ -74,7 +74,7 @@ class Plugin(BaseHid):
"noop": Option(False, type=valid_bool),
}
- def start(self) -> None:
+ def sysprep(self) -> None:
self.__keyboard_proc.start()
self.__mouse_proc.start()
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index cee0048d..5abab1f5 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -175,7 +175,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
multiprocessing.Process.__init__(self, daemon=True)
- self.__reset_pin = gpio.set_output(reset_pin)
+ self.__reset_pin = gpio.set_output(reset_pin, False)
self.__reset_delay = reset_delay
self.__device_path = device_path
@@ -217,9 +217,9 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
"noop": Option(False, type=valid_bool),
}
- def start(self) -> None:
+ def sysprep(self) -> None:
get_logger(0).info("Starting HID daemon ...")
- multiprocessing.Process.start(self)
+ self.start()
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py
index 1254351a..96cb3c63 100644
--- a/kvmd/plugins/msd/relay.py
+++ b/kvmd/plugins/msd/relay.py
@@ -165,8 +165,8 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
reset_delay: float,
) -> None:
- self.__target_pin = gpio.set_output(target_pin)
- self.__reset_pin = gpio.set_output(reset_pin)
+ self.__target_pin = gpio.set_output(target_pin, False)
+ self.__reset_pin = gpio.set_output(reset_pin, False)
self.__device_path = device_path
self.__init_delay = init_delay
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py
new file mode 100644
index 00000000..9dbc0166
--- /dev/null
+++ b/kvmd/plugins/ugpio/__init__.py
@@ -0,0 +1,77 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Type
+from typing import Optional
+
+from ...errors import OperationError
+
+from ... import aiotools
+
+from .. import BasePlugin
+from .. import get_plugin_class
+
+
+# =====
+class GpioError(Exception):
+ pass
+
+
+class GpioOperationError(OperationError, GpioError):
+ pass
+
+
+class GpioDriverOfflineError(GpioOperationError):
+ def __init__(self, driver: "BaseUserGpioDriver") -> None:
+ super().__init__(f"GPIO driver {driver.get_instance_name()!r} is offline")
+
+
+# =====
+class BaseUserGpioDriver(BasePlugin):
+ def get_instance_name(self) -> str:
+ raise NotImplementedError
+
+ def register_input(self, pin: int) -> None:
+ raise NotImplementedError
+
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ raise NotImplementedError
+
+ def prepare(self, notifier: aiotools.AioNotifier) -> None:
+ raise NotImplementedError
+
+ async def run(self) -> None:
+ raise NotImplementedError
+
+ def cleanup(self) -> None:
+ pass
+
+ def read(self, pin: int) -> bool:
+ raise NotImplementedError
+
+ def write(self, pin: int, state: bool) -> None:
+ raise NotImplementedError
+
+
+# =====
+def get_ugpio_driver_class(name: str) -> Type[BaseUserGpioDriver]:
+ return get_plugin_class("ugpio", name) # type: ignore
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
new file mode 100644
index 00000000..04afc75d
--- /dev/null
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -0,0 +1,86 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Dict
+from typing import Set
+from typing import Optional
+
+from ... import aiotools
+from ... import gpio
+
+from ...yamlconf import Option
+
+from ...validators.basic import valid_float_f01
+
+from . import BaseUserGpioDriver
+
+
+# =====
+class Plugin(BaseUserGpioDriver):
+ def __init__(self, state_poll: float) -> None: # pylint: disable=super-init-not-called
+ self.__state_poll = state_poll
+
+ self.__input_pins: Set[int] = set()
+ self.__output_pins: Dict[int, Optional[bool]] = {}
+
+ self.__reader: Optional[gpio.BatchReader] = None
+
+ @classmethod
+ def get_plugin_options(cls) -> Dict:
+ return {
+ "state_poll": Option(0.1, type=valid_float_f01),
+ }
+
+ def get_instance_name(self) -> str:
+ return "gpio"
+
+ def register_input(self, pin: int) -> None:
+ self.__input_pins.add(pin)
+
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ self.__output_pins[pin] = initial
+
+ def prepare(self, notifier: aiotools.AioNotifier) -> None:
+ assert self.__reader is None
+ self.__reader = gpio.BatchReader(
+ pins=set([
+ *map(gpio.set_input, self.__input_pins),
+ *[
+ gpio.set_output(pin, initial)
+ for (pin, initial) in self.__output_pins.items()
+ ],
+ ]),
+ interval=self.__state_poll,
+ notifier=notifier,
+ )
+
+ async def run(self) -> None:
+ assert self.__reader
+ await self.__reader.poll()
+
+ def read(self, pin: int) -> bool:
+ assert self.__reader
+ return self.__reader.get(pin)
+
+ def write(self, pin: int, state: bool) -> None:
+ assert self.__reader
+ gpio.write(pin, state)