summaryrefslogtreecommitdiff
path: root/kvmd/plugins
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-09-06 08:47:43 +0300
committerDevaev Maxim <[email protected]>2020-09-07 05:54:25 +0300
commita6dac4bd8495bc04f762a3ad415d301fb10498c9 (patch)
treeea81c6a9a9ea2521cc4b9bdfaee9b00d5b6e9c6d /kvmd/plugins
parente8bd1e264822967e1ddc628bf507664a5f4b9679 (diff)
ugpio plugins
Diffstat (limited to 'kvmd/plugins')
-rw-r--r--kvmd/plugins/atx/gpio.py6
-rw-r--r--kvmd/plugins/hid/__init__.py4
-rw-r--r--kvmd/plugins/hid/otg/__init__.py2
-rw-r--r--kvmd/plugins/hid/serial.py6
-rw-r--r--kvmd/plugins/msd/relay.py4
-rw-r--r--kvmd/plugins/ugpio/__init__.py77
-rw-r--r--kvmd/plugins/ugpio/gpio.py86
7 files changed, 174 insertions, 11 deletions
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py
index 674f18d1..aed086ae 100644
--- a/kvmd/plugins/atx/gpio.py
+++ b/kvmd/plugins/atx/gpio.py
@@ -62,8 +62,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__power_led_pin = gpio.set_input(power_led_pin)
self.__hdd_led_pin = gpio.set_input(hdd_led_pin)
- self.__power_switch_pin = gpio.set_output(power_switch_pin)
- self.__reset_switch_pin = gpio.set_output(reset_switch_pin)
+ self.__power_switch_pin = gpio.set_output(power_switch_pin, False)
+ self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False)
self.__power_led_inverted = power_led_inverted
self.__hdd_led_inverted = hdd_led_inverted
@@ -75,7 +75,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__state_notifier)
self.__reader = gpio.BatchReader(
- pins=[self.__power_led_pin, self.__hdd_led_pin],
+ pins=set([self.__power_led_pin, self.__hdd_led_pin]),
interval=state_poll,
notifier=self.__state_notifier,
)
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 8f4cb02c..e81ea168 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -32,8 +32,8 @@ from .. import get_plugin_class
# =====
class BaseHid(BasePlugin):
- def start(self) -> None:
- pass
+ def sysprep(self) -> None:
+ raise NotImplementedError
async def get_state(self) -> Dict:
raise NotImplementedError
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py
index 95566d34..575190e5 100644
--- a/kvmd/plugins/hid/otg/__init__.py
+++ b/kvmd/plugins/hid/otg/__init__.py
@@ -74,7 +74,7 @@ class Plugin(BaseHid):
"noop": Option(False, type=valid_bool),
}
- def start(self) -> None:
+ def sysprep(self) -> None:
self.__keyboard_proc.start()
self.__mouse_proc.start()
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index cee0048d..5abab1f5 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -175,7 +175,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
multiprocessing.Process.__init__(self, daemon=True)
- self.__reset_pin = gpio.set_output(reset_pin)
+ self.__reset_pin = gpio.set_output(reset_pin, False)
self.__reset_delay = reset_delay
self.__device_path = device_path
@@ -217,9 +217,9 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
"noop": Option(False, type=valid_bool),
}
- def start(self) -> None:
+ def sysprep(self) -> None:
get_logger(0).info("Starting HID daemon ...")
- multiprocessing.Process.start(self)
+ self.start()
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py
index 1254351a..96cb3c63 100644
--- a/kvmd/plugins/msd/relay.py
+++ b/kvmd/plugins/msd/relay.py
@@ -165,8 +165,8 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
reset_delay: float,
) -> None:
- self.__target_pin = gpio.set_output(target_pin)
- self.__reset_pin = gpio.set_output(reset_pin)
+ self.__target_pin = gpio.set_output(target_pin, False)
+ self.__reset_pin = gpio.set_output(reset_pin, False)
self.__device_path = device_path
self.__init_delay = init_delay
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py
new file mode 100644
index 00000000..9dbc0166
--- /dev/null
+++ b/kvmd/plugins/ugpio/__init__.py
@@ -0,0 +1,77 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Type
+from typing import Optional
+
+from ...errors import OperationError
+
+from ... import aiotools
+
+from .. import BasePlugin
+from .. import get_plugin_class
+
+
+# =====
+class GpioError(Exception):
+ pass
+
+
+class GpioOperationError(OperationError, GpioError):
+ pass
+
+
+class GpioDriverOfflineError(GpioOperationError):
+ def __init__(self, driver: "BaseUserGpioDriver") -> None:
+ super().__init__(f"GPIO driver {driver.get_instance_name()!r} is offline")
+
+
+# =====
+class BaseUserGpioDriver(BasePlugin):
+ def get_instance_name(self) -> str:
+ raise NotImplementedError
+
+ def register_input(self, pin: int) -> None:
+ raise NotImplementedError
+
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ raise NotImplementedError
+
+ def prepare(self, notifier: aiotools.AioNotifier) -> None:
+ raise NotImplementedError
+
+ async def run(self) -> None:
+ raise NotImplementedError
+
+ def cleanup(self) -> None:
+ pass
+
+ def read(self, pin: int) -> bool:
+ raise NotImplementedError
+
+ def write(self, pin: int, state: bool) -> None:
+ raise NotImplementedError
+
+
+# =====
+def get_ugpio_driver_class(name: str) -> Type[BaseUserGpioDriver]:
+ return get_plugin_class("ugpio", name) # type: ignore
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
new file mode 100644
index 00000000..04afc75d
--- /dev/null
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -0,0 +1,86 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Dict
+from typing import Set
+from typing import Optional
+
+from ... import aiotools
+from ... import gpio
+
+from ...yamlconf import Option
+
+from ...validators.basic import valid_float_f01
+
+from . import BaseUserGpioDriver
+
+
+# =====
+class Plugin(BaseUserGpioDriver):
+ def __init__(self, state_poll: float) -> None: # pylint: disable=super-init-not-called
+ self.__state_poll = state_poll
+
+ self.__input_pins: Set[int] = set()
+ self.__output_pins: Dict[int, Optional[bool]] = {}
+
+ self.__reader: Optional[gpio.BatchReader] = None
+
+ @classmethod
+ def get_plugin_options(cls) -> Dict:
+ return {
+ "state_poll": Option(0.1, type=valid_float_f01),
+ }
+
+ def get_instance_name(self) -> str:
+ return "gpio"
+
+ def register_input(self, pin: int) -> None:
+ self.__input_pins.add(pin)
+
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ self.__output_pins[pin] = initial
+
+ def prepare(self, notifier: aiotools.AioNotifier) -> None:
+ assert self.__reader is None
+ self.__reader = gpio.BatchReader(
+ pins=set([
+ *map(gpio.set_input, self.__input_pins),
+ *[
+ gpio.set_output(pin, initial)
+ for (pin, initial) in self.__output_pins.items()
+ ],
+ ]),
+ interval=self.__state_poll,
+ notifier=notifier,
+ )
+
+ async def run(self) -> None:
+ assert self.__reader
+ await self.__reader.poll()
+
+ def read(self, pin: int) -> bool:
+ assert self.__reader
+ return self.__reader.get(pin)
+
+ def write(self, pin: int, state: bool) -> None:
+ assert self.__reader
+ gpio.write(pin, state)