summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiomulti.py102
-rw-r--r--kvmd/apps/kvmd/api/hid.py2
-rw-r--r--kvmd/apps/kvmd/server.py2
-rw-r--r--kvmd/plugins/hid/__init__.py2
-rw-r--r--kvmd/plugins/hid/otg/__init__.py52
-rw-r--r--kvmd/plugins/hid/otg/device.py26
-rw-r--r--kvmd/plugins/hid/otg/keyboard.py12
-rw-r--r--kvmd/plugins/hid/serial.py68
8 files changed, 186 insertions, 80 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
new file mode 100644
index 00000000..18d112f1
--- /dev/null
+++ b/kvmd/aiomulti.py
@@ -0,0 +1,102 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import multiprocessing
+import multiprocessing.queues
+import multiprocessing.sharedctypes
+import queue
+
+from typing import Dict
+
+from . import aiotools
+
+
+# =====
+class AioProcessNotifier:
+ def __init__(self) -> None:
+ self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+ self.__pid = os.getpid()
+
+ def notify(self) -> None:
+ assert os.getpid() != self.__pid, "Child only"
+ self.__queue.put(None)
+
+ async def wait(self) -> None:
+ assert os.getpid() == self.__pid, "Parent only"
+ while not (await aiotools.run_async(self.__inner_wait)):
+ pass
+
+ def __inner_wait(self) -> bool:
+ try:
+ self.__queue.get(timeout=0.1)
+ while not self.__queue.empty():
+ self.__queue.get()
+ return True
+ except queue.Empty:
+ return False
+
+
+class AioSharedFlags:
+ def __init__(
+ self,
+ initial: Dict[str, bool],
+ notifier: AioProcessNotifier,
+ ) -> None:
+
+ self.__local_flags = dict(initial) # To fast comparsion
+ self.__notifier = notifier
+
+ self.__shared_flags: Dict[str, multiprocessing.sharedctypes.RawValue] = {
+ key: multiprocessing.RawValue("i", int(value)) # type: ignore
+ for (key, value) in initial.items()
+ }
+ self.__lock = multiprocessing.Lock()
+ self.__pid = os.getpid()
+
+ def update(self, **kwargs: bool) -> None:
+ assert os.getpid() != self.__pid, "Child only"
+ changed = False
+ try:
+ for (key, value) in kwargs.items():
+ value = bool(value)
+ if self.__local_flags[key] != value:
+ if not changed:
+ self.__lock.acquire()
+ self.__shared_flags[key].value = int(value)
+ self.__local_flags[key] = value
+ changed = True
+ finally:
+ if changed:
+ self.__lock.release()
+ self.__notifier.notify()
+
+ async def get(self) -> Dict[str, bool]:
+ return (await aiotools.run_async(self.__inner_get))
+
+ def __inner_get(self) -> Dict[str, bool]:
+ assert os.getpid() == self.__pid, "Parent only"
+ with self.__lock:
+ return {
+ key: bool(shared.value)
+ for (key, shared) in self.__shared_flags.items()
+ }
diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py
index 24c48798..fad22718 100644
--- a/kvmd/apps/kvmd/api/hid.py
+++ b/kvmd/apps/kvmd/api/hid.py
@@ -47,7 +47,7 @@ class HidApi:
@exposed_http("GET", "/hid")
async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return make_json_response(self.__hid.get_state())
+ return make_json_response(await self.__hid.get_state())
@exposed_http("POST", "/hid/reset")
async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 67a0fa95..1b77d345 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -228,7 +228,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
await asyncio.gather(*[
self.__broadcast_event(_Events.INFO_STATE, (await self.__make_info())),
self.__broadcast_event(_Events.WOL_STATE, self.__wol.get_state()),
- self.__broadcast_event(_Events.HID_STATE, self.__hid.get_state()),
+ self.__broadcast_event(_Events.HID_STATE, (await self.__hid.get_state())),
self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()),
self.__broadcast_event(_Events.MSD_STATE, (await self.__msd.get_state())),
self.__broadcast_event(_Events.STREAMER_STATE, (await self.__streamer.get_state())),
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 4e0ce5ea..bc391b03 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -33,7 +33,7 @@ class BaseHid(BasePlugin):
def start(self) -> None:
pass
- def get_state(self) -> Dict:
+ async def get_state(self) -> Dict:
raise NotImplementedError
async def poll_state(self) -> AsyncGenerator[Dict, None]:
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py
index 84c2803e..0684a674 100644
--- a/kvmd/plugins/hid/otg/__init__.py
+++ b/kvmd/plugins/hid/otg/__init__.py
@@ -20,17 +20,12 @@
# ========================================================================== #
-import asyncio
-import concurrent.futures
-import multiprocessing
-import multiprocessing.queues
-import queue
-import functools
-
from typing import Dict
from typing import AsyncGenerator
from typing import Any
+from .... import aiomulti
+
from ....yamlconf import Option
from ....validators.basic import valid_bool
@@ -54,10 +49,10 @@ class Plugin(BaseHid):
noop: bool,
) -> None:
- self.__changes_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+ self.__state_notifier = aiomulti.AioProcessNotifier()
- self.__keyboard_proc = KeyboardProcess(noop=noop, changes_queue=self.__changes_queue, **keyboard)
- self.__mouse_proc = MouseProcess(noop=noop, changes_queue=self.__changes_queue, **mouse)
+ self.__keyboard_proc = KeyboardProcess(noop=noop, state_notifier=self.__state_notifier, **keyboard)
+ self.__mouse_proc = MouseProcess(noop=noop, state_notifier=self.__state_notifier, **mouse)
@classmethod
def get_plugin_options(cls) -> Dict:
@@ -81,31 +76,30 @@ class Plugin(BaseHid):
self.__keyboard_proc.start()
self.__mouse_proc.start()
- def get_state(self) -> Dict:
- keyboard_state = self.__keyboard_proc.get_state()
- mouse_state = self.__mouse_proc.get_state()
+ async def get_state(self) -> Dict:
+ keyboard_state = await self.__keyboard_proc.get_state()
+ mouse_state = await self.__mouse_proc.get_state()
return {
"online": (keyboard_state["online"] and mouse_state["online"]),
- "keyboard": {"features": {"leds": True}, **keyboard_state},
+ "keyboard": {
+ "online": keyboard_state["online"],
+ "leds": {
+ "caps": keyboard_state["caps"],
+ "scroll": keyboard_state["scroll"],
+ "num": keyboard_state["num"],
+ },
+ },
"mouse": mouse_state,
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
- loop = asyncio.get_running_loop()
- wait_for_changes = functools.partial(self.__changes_queue.get, timeout=1)
- with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
- prev_state: Dict = {}
- while True:
- state = self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- while True:
- try:
- await loop.run_in_executor(executor, wait_for_changes)
- break
- except queue.Empty:
- pass
+ prev_state: Dict = {}
+ while True:
+ state = await self.get_state()
+ if state != prev_state:
+ yield state
+ prev_state = state
+ await self.__state_notifier.wait()
async def reset(self) -> None:
self.__keyboard_proc.send_reset_event()
diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py
index 0fbb63cb..37cb7b07 100644
--- a/kvmd/plugins/hid/otg/device.py
+++ b/kvmd/plugins/hid/otg/device.py
@@ -30,12 +30,13 @@ import errno
import time
from typing import Dict
-from typing import Any
import setproctitle
from ....logging import get_logger
+from .... import aiomulti
+
# =====
class BaseEvent:
@@ -48,7 +49,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
name: str,
read_size: int,
initial_state: Dict,
- changes_queue: multiprocessing.queues.Queue,
+ state_notifier: aiomulti.AioProcessNotifier,
device_path: str,
select_timeout: float,
@@ -61,7 +62,6 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
self.__name = name
self.__read_size = read_size
- self.__changes_queue = changes_queue
self.__device_path = device_path
self.__select_timeout = select_timeout
@@ -71,7 +71,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
self.__fd = -1
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
- self.__state_shared = multiprocessing.Manager().dict(online=True, **initial_state) # type: ignore
+ self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, state_notifier)
self.__stop_event = multiprocessing.Event()
def run(self) -> None:
@@ -100,8 +100,8 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
self.__close_device()
- def get_state(self) -> Dict:
- return dict(self.__state_shared)
+ async def get_state(self) -> Dict:
+ return (await self.__state_flags.get())
# =====
@@ -111,6 +111,9 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
def _process_read_report(self, report: bytes) -> None:
pass
+ def _update_state(self, **kwargs: bool) -> None:
+ self.__state_flags.update(**kwargs)
+
# =====
def _stop(self) -> None:
@@ -134,11 +137,6 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
if close:
self.__close_device()
- def _update_state(self, key: str, value: Any) -> None:
- if self.__state_shared[key] != value:
- self.__state_shared[key] = value
- self.__changes_queue.put(None)
-
# =====
def __write_report(self, report: bytes) -> bool:
@@ -153,7 +151,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
try:
written = os.write(self.__fd, report)
if written == len(report):
- self._update_state("online", True)
+ self.__state_flags.update(online=True)
return True
else:
logger.error("HID-%s write() error: written (%s) != report length (%d)",
@@ -223,7 +221,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
if self.__fd >= 0:
try:
if select.select([], [self.__fd], [], self.__select_timeout)[1]:
- self._update_state("online", True)
+ self.__state_flags.update(online=True)
return True
else:
logger.debug("HID-%s is busy/unplugged (write select)", self.__name)
@@ -231,7 +229,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
logger.error("Can't select() for write HID-%s: %s: %s", self.__name, type(err).__name__, err)
self.__close_device()
- self._update_state("online", False)
+ self.__state_flags.update(online=False)
return False
def __close_device(self) -> None:
diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py
index 6c9ddfed..fb964d8a 100644
--- a/kvmd/plugins/hid/otg/keyboard.py
+++ b/kvmd/plugins/hid/otg/keyboard.py
@@ -68,7 +68,7 @@ class KeyboardProcess(BaseDeviceProcess):
super().__init__(
name="keyboard",
read_size=1,
- initial_state={"leds": {"caps": False, "scroll": False, "num": False}},
+ initial_state={"caps": False, "scroll": False, "num": False},
**kwargs,
)
@@ -98,11 +98,11 @@ class KeyboardProcess(BaseDeviceProcess):
def _process_read_report(self, report: bytes) -> None:
# https://wiki.osdev.org/USB_Human_Interface_Devices#LED_lamps
assert len(report) == 1, report
- self._update_state("leds", {
- "caps": bool(report[0] & 2),
- "scroll": bool(report[0] & 4),
- "num": bool(report[0] & 1),
- })
+ self._update_state(
+ caps=bool(report[0] & 2),
+ scroll=bool(report[0] & 4),
+ num=bool(report[0] & 1),
+ )
# =====
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index d5561e2f..b98bd101 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -23,9 +23,9 @@
import os
import signal
import asyncio
-import dataclasses
import multiprocessing
import multiprocessing.queues
+import dataclasses
import queue
import struct
import errno
@@ -40,6 +40,7 @@ import setproctitle
from ...logging import get_logger
from ... import aiotools
+from ... import aiomulti
from ... import gpio
from ... import keymap
@@ -141,8 +142,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
common_retries: int,
retries_delay: float,
noop: bool,
-
- state_poll: float,
) -> None:
multiprocessing.Process.__init__(self, daemon=True)
@@ -158,12 +157,18 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__retries_delay = retries_delay
self.__noop = noop
- self.__state_poll = state_poll
-
self.__lock = asyncio.Lock()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
- self.__online_shared = multiprocessing.Value("i", 1)
+
+ self.__state_notifier = aiomulti.AioProcessNotifier()
+ self.__state_flags = aiomulti.AioSharedFlags({
+ "online": True,
+ "caps": False,
+ "scroll": False,
+ "num": False,
+ }, self.__state_notifier)
+
self.__stop_event = multiprocessing.Event()
@classmethod
@@ -179,30 +184,35 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
"common_retries": Option(100, type=valid_int_f1),
"retries_delay": Option(0.1, type=valid_float_f01),
"noop": Option(False, type=valid_bool),
-
- "state_poll": Option(0.1, type=valid_float_f01),
}
def start(self) -> None:
get_logger(0).info("Starting HID daemon ...")
multiprocessing.Process.start(self)
- def get_state(self) -> Dict:
- online = bool(self.__online_shared.value)
+ async def get_state(self) -> Dict:
+ state = await self.__state_flags.get()
return {
- "online": online,
- "keyboard": {"features": {"leds": False}, "online": online},
- "mouse": {"online": online},
+ "online": state["online"],
+ "keyboard": {
+ "online": state["online"],
+ "leds": {
+ "caps": state["caps"],
+ "scroll": state["scroll"],
+ "num": state["num"],
+ },
+ },
+ "mouse": {"online": state["online"]},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
- while self.is_alive():
- state = self.get_state()
+ while True:
+ state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
- await asyncio.sleep(self.__state_poll)
+ await self.__state_notifier.wait()
@aiotools.atomic
async def reset(self) -> None:
@@ -275,19 +285,13 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
while not self.__stop_event.is_set():
try:
with self.__get_serial() as tty:
- passed = 0
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
- event: _BaseEvent = self.__events_queue.get(timeout=0.05)
+ event: _BaseEvent = self.__events_queue.get(timeout=0.1)
except queue.Empty:
- if passed >= 20: # 20 * 0.05 = 1 sec
- self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
- passed = 0
- else:
- passed += 1
+ self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
else:
self.__process_command(tty, event.make_command())
- passed = 0
except serial.SerialException as err:
if err.errno == errno.ENOENT:
@@ -341,16 +345,24 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
logger.error("Got CRC error of request from HID: request=%r", request)
elif code == 0x45: # Unknown command
logger.error("HID did not recognize the request=%r", request)
- self.__online_shared.value = 1
+ self.__state_flags.update(online=True)
return
elif code == 0x24: # Rebooted?
logger.error("No previous command state inside HID, seems it was rebooted")
- self.__online_shared.value = 1
+ self.__state_flags.update(online=True)
return
elif code == 0x20: # Done
if error_occured:
logger.info("Success!")
- self.__online_shared.value = 1
+ self.__state_flags.update(online=True)
+ return
+ elif code & 0x80: # Pong with leds
+ self.__state_flags.update(
+ online=True,
+ caps=bool(code & 0b00000001),
+ scroll=bool(code & 0x00000010),
+ num=bool(code & 0x00000100),
+ )
return
else:
logger.error("Invalid response from HID: request=%r; code=0x%x", request, code)
@@ -358,7 +370,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
common_retries -= 1
error_occured = True
- self.__online_shared.value = 0
+ self.__state_flags.update(online=False)
if common_retries and read_retries:
logger.error("Retries left: common_retries=%d; read_retries=%d", common_retries, read_retries)