summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiomulti.py40
-rw-r--r--kvmd/plugins/ugpio/ezcoo.py193
2 files changed, 223 insertions, 10 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
index edfc5c34..c0fca419 100644
--- a/kvmd/aiomulti.py
+++ b/kvmd/aiomulti.py
@@ -23,12 +23,41 @@
import multiprocessing
import queue
+from typing import Tuple
from typing import Dict
+from typing import TypeVar
+from typing import Optional
from . import aiotools
# =====
+_QueueItemT = TypeVar("_QueueItemT")
+
+
+async def queue_get_last( # pylint: disable=invalid-name
+ q: "multiprocessing.Queue[_QueueItemT]",
+ timeout: float,
+) -> Tuple[bool, Optional[_QueueItemT]]:
+
+ return (await aiotools.run_async(queue_get_last_sync, q, timeout))
+
+
+def queue_get_last_sync( # pylint: disable=invalid-name
+ q: "multiprocessing.Queue[_QueueItemT]",
+ timeout: float,
+) -> Tuple[bool, Optional[_QueueItemT]]:
+
+ try:
+ item = q.get(timeout=timeout)
+ while not q.empty():
+ item = q.get()
+ return (True, item)
+ except queue.Empty:
+ return (False, None)
+
+
+# =====
class AioProcessNotifier:
def __init__(self) -> None:
self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue()
@@ -37,18 +66,9 @@ class AioProcessNotifier:
self.__queue.put_nowait(None)
async def wait(self) -> None:
- while not (await aiotools.run_async(self.__inner_wait)):
+ while not (await queue_get_last(self.__queue, 0.1))[0]:
pass
- def __inner_wait(self) -> bool:
- try:
- self.__queue.get(timeout=0.1)
- while not self.__queue.empty():
- self.__queue.get()
- return True
- except queue.Empty:
- return False
-
# =====
class AioSharedFlags:
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py
new file mode 100644
index 00000000..460bce2d
--- /dev/null
+++ b/kvmd/plugins/ugpio/ezcoo.py
@@ -0,0 +1,193 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import re
+import multiprocessing
+import errno
+import time
+
+from typing import Tuple
+from typing import Dict
+from typing import Optional
+
+import serial
+
+from ...logging import get_logger
+
+from ... import aiotools
+from ... import aiomulti
+from ... import aioproc
+
+from ...yamlconf import Option
+
+from ...validators.basic import valid_float_f01
+from ...validators.os import valid_abs_path
+from ...validators.hw import valid_tty_speed
+
+from . import GpioDriverOfflineError
+from . import BaseUserGpioDriver
+
+
+# =====
+class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ instance_name: str,
+ notifier: aiotools.AioNotifier,
+
+ device_path: str,
+ speed: int,
+ read_timeout: float,
+ ) -> None:
+
+ super().__init__(instance_name, notifier)
+
+ self.__device_path = device_path
+ self.__speed = speed
+ self.__read_timeout = read_timeout
+
+ self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue()
+ self.__channel_queue: "multiprocessing.Queue[Optional[int]]" = multiprocessing.Queue()
+ self.__channel: Optional[int] = -1
+
+ self.__proc: Optional[multiprocessing.Process] = None
+ self.__stop_event = multiprocessing.Event()
+
+ @classmethod
+ def get_plugin_options(cls) -> Dict:
+ return {
+ "device": Option("", type=valid_abs_path, unpack_as="device_path"),
+ "speed": Option(115200, type=valid_tty_speed),
+ "read_timeout": Option(2.0, type=valid_float_f01),
+ }
+
+ def register_input(self, pin: int, debounce: float) -> None:
+ _ = pin
+ _ = debounce
+
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ _ = pin
+ _ = initial
+
+ def prepare(self) -> None:
+ assert self.__proc is None
+ self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
+ self.__proc.start()
+
+ async def run(self) -> None:
+ while True:
+ (got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1)
+ if got and self.__channel != channel:
+ self.__channel = channel
+ await self._notifier.notify()
+
+ def cleanup(self) -> None:
+ if self.__proc is not None:
+ if self.__proc.is_alive():
+ get_logger(0).info("Stopping %s daemon ...", self)
+ self.__stop_event.set()
+ if self.__proc.exitcode is not None:
+ self.__proc.join()
+
+ def read(self, pin: int) -> bool:
+ if not self.__is_online():
+ raise GpioDriverOfflineError(self)
+ return (self.__channel == pin)
+
+ def write(self, pin: int, state: bool) -> None:
+ if not self.__is_online():
+ raise GpioDriverOfflineError(self)
+ if state and (0 <= pin <= 3):
+ self.__ctl_queue.put_nowait(pin)
+
+ # =====
+
+ def __is_online(self) -> bool:
+ return (
+ self.__proc is not None
+ and self.__proc.is_alive()
+ and self.__channel is not None
+ )
+
+ def __serial_worker(self) -> None:
+ logger = get_logger(0)
+
+ logger.info("Started %s pid=%d", self, os.getpid())
+ aioproc.ignore_sigint()
+ aioproc.rename_process(f"ugpio-ezcoo-{self._instance_name}")
+
+ while not self.__stop_event.is_set():
+ try:
+ with self.__get_serial() as tty:
+ data = b""
+ self.__channel_queue.put_nowait(-1)
+
+ # Switch and then recieve the state.
+ # FIXME: Get actual state without modifying the current.
+ self.__send_channel(tty, 0)
+
+ while not self.__stop_event.is_set():
+ (channel, data) = self.__recv_channel(tty, data)
+ if channel is not None:
+ self.__channel_queue.put_nowait(channel)
+
+ (got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore
+ if got:
+ assert channel is not None
+ self.__send_channel(tty, channel)
+
+ except Exception as err:
+ self.__channel_queue.put_nowait(None)
+ if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
+ logger.error("Missing %s serial device: %s", self, self.__device_path)
+ else:
+ logger.exception("Unexpected %s error", self)
+ time.sleep(1)
+
+ def __get_serial(self) -> serial.Serial:
+ return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout)
+
+ def __recv_channel(self, tty: serial.Serial, data: bytes) -> Tuple[Optional[int], bytes]:
+ channel: Optional[int] = None
+ if tty.in_waiting:
+ data += tty.read_all()
+ found = re.findall(b"V[0-9a-fA-F]{2}S", data)
+ if found:
+ channel = {
+ b"V0CS": 0,
+ b"V18S": 1,
+ b"V5ES": 2,
+ b"V08S": 3,
+ }.get(found[-1], -1)
+ data = data[-8:]
+ return (channel, data)
+
+ def __send_channel(self, tty: serial.Serial, channel: int) -> None:
+ # Twice because of ezcoo bugs
+ tty.write((b"SET OUT1 VS IN%d\n" % (channel + 1)) * 2)
+ tty.flush()
+
+ def __str__(self) -> str:
+ return f"Ezcoo({self._instance_name})"
+
+ __repr__ = __str__