diff options
Diffstat (limited to 'kvmd/plugins/hid/bt/__init__.py')
-rw-r--r-- | kvmd/plugins/hid/bt/__init__.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/kvmd/plugins/hid/bt/__init__.py b/kvmd/plugins/hid/bt/__init__.py new file mode 100644 index 00000000..6c8dc46b --- /dev/null +++ b/kvmd/plugins/hid/bt/__init__.py @@ -0,0 +1,208 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import multiprocessing +import time + +from typing import Tuple +from typing import Dict +from typing import Iterable +from typing import AsyncGenerator +from typing import Optional + +from ....logging import get_logger + +from ....yamlconf import Option + +from ....validators.basic import valid_bool +from ....validators.basic import valid_stripped_string_not_empty +from ....validators.basic import valid_int_f1 +from ....validators.basic import valid_float_f01 + +from .... import aiotools +from .... import aiomulti +from .... import aioproc + +from .. import BaseHid + +from ..otg.events import ResetEvent +from ..otg.events import make_keyboard_event +from ..otg.events import MouseButtonEvent +from ..otg.events import MouseRelativeEvent +from ..otg.events import MouseWheelEvent + +from .sdp import make_sdp_record +from .bluez import BluezIface +from .server import BtServer + + +# ===== +class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes + # https://github.com/SySS-Research/bluetooth-keyboard-emulator + # https://github.com/nutki/bt-keyboard-switcher + # https://gist.github.com/whitelynx/9f9bd4cb266b3924c64dfdff14bce2e8 + # https://archlinuxarm.org/forum/viewtopic.php?f=67&t=14244 + + def __init__( # pylint: disable=too-many-arguments,too-many-locals,super-init-not-called + self, + manufacturer: str, + product: str, + description: str, + + iface: str, + alias: str, + + pairing_required: bool, + auth_required: bool, + control_public: bool, + unpair_on_close: bool, + + max_clients: int, + socket_timeout: float, + select_timeout: float, + ) -> None: + + self.__proc: Optional[multiprocessing.Process] = None + self.__stop_event = multiprocessing.Event() + + self.__notifier = aiomulti.AioProcessNotifier() + + self.__server = BtServer( + iface=BluezIface( + iface=iface, + alias=alias, + sdp_record=make_sdp_record(manufacturer, product, description), + pairing_required=pairing_required, + auth_required=auth_required, + ), + control_public=control_public, + unpair_on_close=unpair_on_close, + max_clients=max_clients, + socket_timeout=socket_timeout, + select_timeout=select_timeout, + notifier=self.__notifier, + stop_event=self.__stop_event, + ) + + @classmethod + def get_plugin_options(cls) -> Dict: + return { + "manufacturer": Option("Pi-KVM"), + "product": Option("HID Device"), + "description": Option("Bluetooth Keyboard & Mouse"), + + "iface": Option("hci0", type=valid_stripped_string_not_empty), + "alias": Option("Pi-KVM HID"), + + "pairing_required": Option(True, type=valid_bool), + "auth_required": Option(False, type=valid_bool), + "control_public": Option(True, type=valid_bool), + "unpair_on_close": Option(True, type=valid_bool), + + "max_clients": Option(1, type=valid_int_f1), + "socket_timeout": Option(5.0, type=valid_float_f01), + "select_timeout": Option(1.0, type=valid_float_f01), + } + + def sysprep(self) -> None: + get_logger(0).info("Starting HID daemon ...") + self.__proc = multiprocessing.Process(target=self.__server_worker, daemon=True) + self.__proc.start() + + async def get_state(self) -> Dict: + state = await self.__server.get_state() + return { + "online": state["online"], + "keyboard": { + "online": state["online"], + "leds": { + "caps": state["caps"], + "scroll": state["scroll"], + "num": state["num"], + }, + }, + "mouse": { + "online": state["online"], + "absolute": False, + }, + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + prev_state: Dict = {} + while True: + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + await self.__notifier.wait() + + async def reset(self) -> None: + self.clear_events() + self.__server.queue_event(ResetEvent()) + + @aiotools.atomic + async def cleanup(self) -> None: + if self.__proc is not None: + if self.__proc.is_alive(): + get_logger(0).info("Stopping HID daemon ...") + self.__stop_event.set() + if self.__proc.exitcode is not None: + self.__proc.join() + + # ===== + + def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None: + for (key, state) in keys: + self.__server.queue_event(make_keyboard_event(key, state)) + + def send_mouse_button_event(self, button: str, state: bool) -> None: + self.__server.queue_event(MouseButtonEvent(button, state)) + + def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + _ = to_x # No absolute events + _ = to_y + + def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: + self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y)) + + def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + self.__server.queue_event(MouseWheelEvent(delta_x, delta_y)) + + def clear_events(self) -> None: + self.__server.clear_events() + + # ===== + + def __server_worker(self) -> None: # pylint: disable=too-many-branches + logger = get_logger(0) + + logger.info("Started HID pid=%d", os.getpid()) + aioproc.ignore_sigint() + aioproc.rename_process("hid") + + while not self.__stop_event.is_set(): + try: + self.__server.run() + except Exception: + logger.exception("Unexpected HID error") + time.sleep(5) |