diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiomulti.py | 102 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/hid.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/hid/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/__init__.py | 52 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/device.py | 26 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/keyboard.py | 12 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 68 |
8 files changed, 186 insertions, 80 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py new file mode 100644 index 00000000..18d112f1 --- /dev/null +++ b/kvmd/aiomulti.py @@ -0,0 +1,102 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import multiprocessing +import multiprocessing.queues +import multiprocessing.sharedctypes +import queue + +from typing import Dict + +from . import aiotools + + +# ===== +class AioProcessNotifier: + def __init__(self) -> None: + self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__pid = os.getpid() + + def notify(self) -> None: + assert os.getpid() != self.__pid, "Child only" + self.__queue.put(None) + + async def wait(self) -> None: + assert os.getpid() == self.__pid, "Parent only" + while not (await aiotools.run_async(self.__inner_wait)): + pass + + def __inner_wait(self) -> bool: + try: + self.__queue.get(timeout=0.1) + while not self.__queue.empty(): + self.__queue.get() + return True + except queue.Empty: + return False + + +class AioSharedFlags: + def __init__( + self, + initial: Dict[str, bool], + notifier: AioProcessNotifier, + ) -> None: + + self.__local_flags = dict(initial) # To fast comparsion + self.__notifier = notifier + + self.__shared_flags: Dict[str, multiprocessing.sharedctypes.RawValue] = { + key: multiprocessing.RawValue("i", int(value)) # type: ignore + for (key, value) in initial.items() + } + self.__lock = multiprocessing.Lock() + self.__pid = os.getpid() + + def update(self, **kwargs: bool) -> None: + assert os.getpid() != self.__pid, "Child only" + changed = False + try: + for (key, value) in kwargs.items(): + value = bool(value) + if self.__local_flags[key] != value: + if not changed: + self.__lock.acquire() + self.__shared_flags[key].value = int(value) + self.__local_flags[key] = value + changed = True + finally: + if changed: + self.__lock.release() + self.__notifier.notify() + + async def get(self) -> Dict[str, bool]: + return (await aiotools.run_async(self.__inner_get)) + + def __inner_get(self) -> Dict[str, bool]: + assert os.getpid() == self.__pid, "Parent only" + with self.__lock: + return { + key: bool(shared.value) + for (key, shared) in self.__shared_flags.items() + } diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py index 24c48798..fad22718 100644 --- a/kvmd/apps/kvmd/api/hid.py +++ b/kvmd/apps/kvmd/api/hid.py @@ -47,7 +47,7 @@ class HidApi: @exposed_http("GET", "/hid") async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return make_json_response(self.__hid.get_state()) + return make_json_response(await self.__hid.get_state()) @exposed_http("POST", "/hid/reset") async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 67a0fa95..1b77d345 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -228,7 +228,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins await asyncio.gather(*[ self.__broadcast_event(_Events.INFO_STATE, (await self.__make_info())), self.__broadcast_event(_Events.WOL_STATE, self.__wol.get_state()), - self.__broadcast_event(_Events.HID_STATE, self.__hid.get_state()), + self.__broadcast_event(_Events.HID_STATE, (await self.__hid.get_state())), self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()), self.__broadcast_event(_Events.MSD_STATE, (await self.__msd.get_state())), self.__broadcast_event(_Events.STREAMER_STATE, (await self.__streamer.get_state())), diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py index 4e0ce5ea..bc391b03 100644 --- a/kvmd/plugins/hid/__init__.py +++ b/kvmd/plugins/hid/__init__.py @@ -33,7 +33,7 @@ class BaseHid(BasePlugin): def start(self) -> None: pass - def get_state(self) -> Dict: + async def get_state(self) -> Dict: raise NotImplementedError async def poll_state(self) -> AsyncGenerator[Dict, None]: diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py index 84c2803e..0684a674 100644 --- a/kvmd/plugins/hid/otg/__init__.py +++ b/kvmd/plugins/hid/otg/__init__.py @@ -20,17 +20,12 @@ # ========================================================================== # -import asyncio -import concurrent.futures -import multiprocessing -import multiprocessing.queues -import queue -import functools - from typing import Dict from typing import AsyncGenerator from typing import Any +from .... import aiomulti + from ....yamlconf import Option from ....validators.basic import valid_bool @@ -54,10 +49,10 @@ class Plugin(BaseHid): noop: bool, ) -> None: - self.__changes_queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__state_notifier = aiomulti.AioProcessNotifier() - self.__keyboard_proc = KeyboardProcess(noop=noop, changes_queue=self.__changes_queue, **keyboard) - self.__mouse_proc = MouseProcess(noop=noop, changes_queue=self.__changes_queue, **mouse) + self.__keyboard_proc = KeyboardProcess(noop=noop, state_notifier=self.__state_notifier, **keyboard) + self.__mouse_proc = MouseProcess(noop=noop, state_notifier=self.__state_notifier, **mouse) @classmethod def get_plugin_options(cls) -> Dict: @@ -81,31 +76,30 @@ class Plugin(BaseHid): self.__keyboard_proc.start() self.__mouse_proc.start() - def get_state(self) -> Dict: - keyboard_state = self.__keyboard_proc.get_state() - mouse_state = self.__mouse_proc.get_state() + async def get_state(self) -> Dict: + keyboard_state = await self.__keyboard_proc.get_state() + mouse_state = await self.__mouse_proc.get_state() return { "online": (keyboard_state["online"] and mouse_state["online"]), - "keyboard": {"features": {"leds": True}, **keyboard_state}, + "keyboard": { + "online": keyboard_state["online"], + "leds": { + "caps": keyboard_state["caps"], + "scroll": keyboard_state["scroll"], + "num": keyboard_state["num"], + }, + }, "mouse": mouse_state, } async def poll_state(self) -> AsyncGenerator[Dict, None]: - loop = asyncio.get_running_loop() - wait_for_changes = functools.partial(self.__changes_queue.get, timeout=1) - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - prev_state: Dict = {} - while True: - state = self.get_state() - if state != prev_state: - yield state - prev_state = state - while True: - try: - await loop.run_in_executor(executor, wait_for_changes) - break - except queue.Empty: - pass + prev_state: Dict = {} + while True: + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + await self.__state_notifier.wait() async def reset(self) -> None: self.__keyboard_proc.send_reset_event() diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py index 0fbb63cb..37cb7b07 100644 --- a/kvmd/plugins/hid/otg/device.py +++ b/kvmd/plugins/hid/otg/device.py @@ -30,12 +30,13 @@ import errno import time from typing import Dict -from typing import Any import setproctitle from ....logging import get_logger +from .... import aiomulti + # ===== class BaseEvent: @@ -48,7 +49,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in name: str, read_size: int, initial_state: Dict, - changes_queue: multiprocessing.queues.Queue, + state_notifier: aiomulti.AioProcessNotifier, device_path: str, select_timeout: float, @@ -61,7 +62,6 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__name = name self.__read_size = read_size - self.__changes_queue = changes_queue self.__device_path = device_path self.__select_timeout = select_timeout @@ -71,7 +71,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__fd = -1 self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() - self.__state_shared = multiprocessing.Manager().dict(online=True, **initial_state) # type: ignore + self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, state_notifier) self.__stop_event = multiprocessing.Event() def run(self) -> None: @@ -100,8 +100,8 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__close_device() - def get_state(self) -> Dict: - return dict(self.__state_shared) + async def get_state(self) -> Dict: + return (await self.__state_flags.get()) # ===== @@ -111,6 +111,9 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in def _process_read_report(self, report: bytes) -> None: pass + def _update_state(self, **kwargs: bool) -> None: + self.__state_flags.update(**kwargs) + # ===== def _stop(self) -> None: @@ -134,11 +137,6 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in if close: self.__close_device() - def _update_state(self, key: str, value: Any) -> None: - if self.__state_shared[key] != value: - self.__state_shared[key] = value - self.__changes_queue.put(None) - # ===== def __write_report(self, report: bytes) -> bool: @@ -153,7 +151,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in try: written = os.write(self.__fd, report) if written == len(report): - self._update_state("online", True) + self.__state_flags.update(online=True) return True else: logger.error("HID-%s write() error: written (%s) != report length (%d)", @@ -223,7 +221,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in if self.__fd >= 0: try: if select.select([], [self.__fd], [], self.__select_timeout)[1]: - self._update_state("online", True) + self.__state_flags.update(online=True) return True else: logger.debug("HID-%s is busy/unplugged (write select)", self.__name) @@ -231,7 +229,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in logger.error("Can't select() for write HID-%s: %s: %s", self.__name, type(err).__name__, err) self.__close_device() - self._update_state("online", False) + self.__state_flags.update(online=False) return False def __close_device(self) -> None: diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py index 6c9ddfed..fb964d8a 100644 --- a/kvmd/plugins/hid/otg/keyboard.py +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -68,7 +68,7 @@ class KeyboardProcess(BaseDeviceProcess): super().__init__( name="keyboard", read_size=1, - initial_state={"leds": {"caps": False, "scroll": False, "num": False}}, + initial_state={"caps": False, "scroll": False, "num": False}, **kwargs, ) @@ -98,11 +98,11 @@ class KeyboardProcess(BaseDeviceProcess): def _process_read_report(self, report: bytes) -> None: # https://wiki.osdev.org/USB_Human_Interface_Devices#LED_lamps assert len(report) == 1, report - self._update_state("leds", { - "caps": bool(report[0] & 2), - "scroll": bool(report[0] & 4), - "num": bool(report[0] & 1), - }) + self._update_state( + caps=bool(report[0] & 2), + scroll=bool(report[0] & 4), + num=bool(report[0] & 1), + ) # ===== diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index d5561e2f..b98bd101 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -23,9 +23,9 @@ import os import signal import asyncio -import dataclasses import multiprocessing import multiprocessing.queues +import dataclasses import queue import struct import errno @@ -40,6 +40,7 @@ import setproctitle from ...logging import get_logger from ... import aiotools +from ... import aiomulti from ... import gpio from ... import keymap @@ -141,8 +142,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst common_retries: int, retries_delay: float, noop: bool, - - state_poll: float, ) -> None: multiprocessing.Process.__init__(self, daemon=True) @@ -158,12 +157,18 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__retries_delay = retries_delay self.__noop = noop - self.__state_poll = state_poll - self.__lock = asyncio.Lock() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() - self.__online_shared = multiprocessing.Value("i", 1) + + self.__state_notifier = aiomulti.AioProcessNotifier() + self.__state_flags = aiomulti.AioSharedFlags({ + "online": True, + "caps": False, + "scroll": False, + "num": False, + }, self.__state_notifier) + self.__stop_event = multiprocessing.Event() @classmethod @@ -179,30 +184,35 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst "common_retries": Option(100, type=valid_int_f1), "retries_delay": Option(0.1, type=valid_float_f01), "noop": Option(False, type=valid_bool), - - "state_poll": Option(0.1, type=valid_float_f01), } def start(self) -> None: get_logger(0).info("Starting HID daemon ...") multiprocessing.Process.start(self) - def get_state(self) -> Dict: - online = bool(self.__online_shared.value) + async def get_state(self) -> Dict: + state = await self.__state_flags.get() return { - "online": online, - "keyboard": {"features": {"leds": False}, "online": online}, - "mouse": {"online": online}, + "online": state["online"], + "keyboard": { + "online": state["online"], + "leds": { + "caps": state["caps"], + "scroll": state["scroll"], + "num": state["num"], + }, + }, + "mouse": {"online": state["online"]}, } async def poll_state(self) -> AsyncGenerator[Dict, None]: prev_state: Dict = {} - while self.is_alive(): - state = self.get_state() + while True: + state = await self.get_state() if state != prev_state: yield state prev_state = state - await asyncio.sleep(self.__state_poll) + await self.__state_notifier.wait() @aiotools.atomic async def reset(self) -> None: @@ -275,19 +285,13 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst while not self.__stop_event.is_set(): try: with self.__get_serial() as tty: - passed = 0 while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0): try: - event: _BaseEvent = self.__events_queue.get(timeout=0.05) + event: _BaseEvent = self.__events_queue.get(timeout=0.1) except queue.Empty: - if passed >= 20: # 20 * 0.05 = 1 sec - self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping - passed = 0 - else: - passed += 1 + self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping else: self.__process_command(tty, event.make_command()) - passed = 0 except serial.SerialException as err: if err.errno == errno.ENOENT: @@ -341,16 +345,24 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst logger.error("Got CRC error of request from HID: request=%r", request) elif code == 0x45: # Unknown command logger.error("HID did not recognize the request=%r", request) - self.__online_shared.value = 1 + self.__state_flags.update(online=True) return elif code == 0x24: # Rebooted? logger.error("No previous command state inside HID, seems it was rebooted") - self.__online_shared.value = 1 + self.__state_flags.update(online=True) return elif code == 0x20: # Done if error_occured: logger.info("Success!") - self.__online_shared.value = 1 + self.__state_flags.update(online=True) + return + elif code & 0x80: # Pong with leds + self.__state_flags.update( + online=True, + caps=bool(code & 0b00000001), + scroll=bool(code & 0x00000010), + num=bool(code & 0x00000100), + ) return else: logger.error("Invalid response from HID: request=%r; code=0x%x", request, code) @@ -358,7 +370,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst common_retries -= 1 error_occured = True - self.__online_shared.value = 0 + self.__state_flags.update(online=False) if common_retries and read_retries: logger.error("Retries left: common_retries=%d; read_retries=%d", common_retries, read_retries) |