diff options
author | Devaev Maxim <[email protected]> | 2020-11-19 23:28:23 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-11-20 00:15:18 +0300 |
commit | a77db72355c760eb38deb4c46a83fb7d3f2ed008 (patch) | |
tree | 208cd060c234fbfd77ef3c1879ed906950660bed /kvmd/plugins/hid/_mcu/__init__.py | |
parent | 188de715153100806bc4a95f3888f6f03f1ede2f (diff) |
multihid firmware
Diffstat (limited to 'kvmd/plugins/hid/_mcu/__init__.py')
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 97 |
1 files changed, 66 insertions, 31 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index b46c876d..678b4887 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -137,6 +137,19 @@ class _MouseMoveEvent(_BaseEvent): @dataclasses.dataclass(frozen=True) +class _MouseRelativeEvent(_BaseEvent): + delta_x: int + delta_y: int + + def __post_init__(self) -> None: + assert -127 <= self.delta_x <= 127 + assert -127 <= self.delta_y <= 127 + + def make_command(self) -> bytes: + return struct.pack(">Bbbxx", 0x15, self.delta_x, self.delta_y) + + [email protected](frozen=True) class _MouseWheelEvent(_BaseEvent): delta_x: int delta_y: int @@ -196,12 +209,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__notifier = aiomulti.AioProcessNotifier() self.__state_flags = aiomulti.AioSharedFlags({ - "keyboard_online": True, - "mouse_online": True, - "caps": False, - "scroll": False, - "num": False, - }, self.__notifier) + "online": 0, + "status": 0, + }, self.__notifier, type=int) self.__stop_event = multiprocessing.Event() @@ -226,19 +236,51 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- async def get_state(self) -> Dict: state = await self.__state_flags.get() + online = bool(state["online"]) + pong = (state["status"] >> 16) & 0xFF + outputs = (state["status"] >> 8) & 0xFF + features = state["status"] & 0xFF + + absolute = True + if online and (outputs & 0b00111000) in [0b00010000, 0b00011000]: + absolute = False + + keyboard_outputs: Dict = {"available": {}, "active": ""} + mouse_outputs: Dict = {"available": {}, "active": ""} + if outputs & 0b10000000: # Dynamic + if features & 0b00000001: # USB + keyboard_outputs["available"]["usb"] = {"name": "USB"} + mouse_outputs["available"]["usb_abs"] = {"name": "USB", "absolute": True} + mouse_outputs["available"]["usb_rel"] = {"name": "USB Relative", "absolute": False} + if features & 0b00000010: # PS/2 + keyboard_outputs["available"]["ps2"] = {"name": "PS/2"} + mouse_outputs["available"]["ps2"] = {"name": "PS/2"} + + keyboard_outputs["active"] = { + 0b00000001: "usb", + 0b00000011: "ps2", + }.get(outputs & 0b00000111, "") + mouse_outputs["active"] = { + 0b00001000: "usb_abs", + 0b00010000: "usb_rel", + 0b00011000: "ps2", + }.get(outputs & 0b00111000, "") + return { - "online": (state["keyboard_online"] and state["mouse_online"]), + "online": online, "keyboard": { - "online": state["keyboard_online"], + "online": (online and not (pong & 0b00001000)), "leds": { - "caps": state["caps"], - "scroll": state["scroll"], - "num": state["num"], + "caps": bool(pong & 0b00000001), + "scroll": bool(pong & 0b00000010), + "num": bool(pong & 0b00000100), }, + "outputs": keyboard_outputs, }, "mouse": { - "online": state["mouse_online"], - "absolute": True, + "online": (online and not (pong & 0b00010000)), + "absolute": absolute, + "outputs": mouse_outputs, }, } @@ -287,8 +329,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__queue_event(_MouseMoveEvent(to_x, to_y)) def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: - _ = delta_x # No relative events yet - _ = delta_y + self.__queue_event(_MouseRelativeEvent(delta_x, delta_y)) def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__queue_event(_MouseWheelEvent(delta_x, delta_y)) @@ -350,8 +391,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- read_retries -= 1 raise _TempRequestError(f"No response from HID: request={request!r}") - assert len(response) == 4, response - if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]: + assert len(response) in (4, 8), response + if self.__make_crc16(response[:-2]) != struct.unpack(">H", response[-2:])[0]: request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer raise _TempRequestError("Invalid response CRC; requesting response again ...") @@ -368,7 +409,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__set_state_online(True) return True elif code & 0x80: # Pong/Done with state - self.__set_state_code(code) + self.__set_state_pong(response) return True raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}") @@ -401,19 +442,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- return error_retval def __set_state_online(self, online: bool) -> None: - self.__state_flags.update( - keyboard_online=online, - mouse_online=online, - ) - - def __set_state_code(self, code: int) -> None: - self.__state_flags.update( - keyboard_online=(not (code & 0b00001000)), - mouse_online=(not (code & 0b00010000)), - caps=bool(code & 0b00000001), - scroll=bool(code & 0b00000010), - num=bool(code & 0b00000100), - ) + self.__state_flags.update(online=int(online)) + + def __set_state_pong(self, data: bytes) -> None: + status = data[1] << 16 + if len(data) > 4: + status |= (data[2] << 8) | data[3] + self.__state_flags.update(online=1, status=status) def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes: if not self.__noop: |