summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid/_mcu/__init__.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-11-19 23:28:23 +0300
committerDevaev Maxim <[email protected]>2020-11-20 00:15:18 +0300
commita77db72355c760eb38deb4c46a83fb7d3f2ed008 (patch)
tree208cd060c234fbfd77ef3c1879ed906950660bed /kvmd/plugins/hid/_mcu/__init__.py
parent188de715153100806bc4a95f3888f6f03f1ede2f (diff)
multihid firmware
Diffstat (limited to 'kvmd/plugins/hid/_mcu/__init__.py')
-rw-r--r--kvmd/plugins/hid/_mcu/__init__.py97
1 files changed, 66 insertions, 31 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py
index b46c876d..678b4887 100644
--- a/kvmd/plugins/hid/_mcu/__init__.py
+++ b/kvmd/plugins/hid/_mcu/__init__.py
@@ -137,6 +137,19 @@ class _MouseMoveEvent(_BaseEvent):
@dataclasses.dataclass(frozen=True)
+class _MouseRelativeEvent(_BaseEvent):
+ delta_x: int
+ delta_y: int
+
+ def __post_init__(self) -> None:
+ assert -127 <= self.delta_x <= 127
+ assert -127 <= self.delta_y <= 127
+
+ def make_command(self) -> bytes:
+ return struct.pack(">Bbbxx", 0x15, self.delta_x, self.delta_y)
+
+
[email protected](frozen=True)
class _MouseWheelEvent(_BaseEvent):
delta_x: int
delta_y: int
@@ -196,12 +209,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
- "keyboard_online": True,
- "mouse_online": True,
- "caps": False,
- "scroll": False,
- "num": False,
- }, self.__notifier)
+ "online": 0,
+ "status": 0,
+ }, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event()
@@ -226,19 +236,51 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
+ online = bool(state["online"])
+ pong = (state["status"] >> 16) & 0xFF
+ outputs = (state["status"] >> 8) & 0xFF
+ features = state["status"] & 0xFF
+
+ absolute = True
+ if online and (outputs & 0b00111000) in [0b00010000, 0b00011000]:
+ absolute = False
+
+ keyboard_outputs: Dict = {"available": {}, "active": ""}
+ mouse_outputs: Dict = {"available": {}, "active": ""}
+ if outputs & 0b10000000: # Dynamic
+ if features & 0b00000001: # USB
+ keyboard_outputs["available"]["usb"] = {"name": "USB"}
+ mouse_outputs["available"]["usb_abs"] = {"name": "USB", "absolute": True}
+ mouse_outputs["available"]["usb_rel"] = {"name": "USB Relative", "absolute": False}
+ if features & 0b00000010: # PS/2
+ keyboard_outputs["available"]["ps2"] = {"name": "PS/2"}
+ mouse_outputs["available"]["ps2"] = {"name": "PS/2"}
+
+ keyboard_outputs["active"] = {
+ 0b00000001: "usb",
+ 0b00000011: "ps2",
+ }.get(outputs & 0b00000111, "")
+ mouse_outputs["active"] = {
+ 0b00001000: "usb_abs",
+ 0b00010000: "usb_rel",
+ 0b00011000: "ps2",
+ }.get(outputs & 0b00111000, "")
+
return {
- "online": (state["keyboard_online"] and state["mouse_online"]),
+ "online": online,
"keyboard": {
- "online": state["keyboard_online"],
+ "online": (online and not (pong & 0b00001000)),
"leds": {
- "caps": state["caps"],
- "scroll": state["scroll"],
- "num": state["num"],
+ "caps": bool(pong & 0b00000001),
+ "scroll": bool(pong & 0b00000010),
+ "num": bool(pong & 0b00000100),
},
+ "outputs": keyboard_outputs,
},
"mouse": {
- "online": state["mouse_online"],
- "absolute": True,
+ "online": (online and not (pong & 0b00010000)),
+ "absolute": absolute,
+ "outputs": mouse_outputs,
},
}
@@ -287,8 +329,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__queue_event(_MouseMoveEvent(to_x, to_y))
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
- _ = delta_x # No relative events yet
- _ = delta_y
+ self.__queue_event(_MouseRelativeEvent(delta_x, delta_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
@@ -350,8 +391,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
- assert len(response) == 4, response
- if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]:
+ assert len(response) in (4, 8), response
+ if self.__make_crc16(response[:-2]) != struct.unpack(">H", response[-2:])[0]:
request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer
raise _TempRequestError("Invalid response CRC; requesting response again ...")
@@ -368,7 +409,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__set_state_online(True)
return True
elif code & 0x80: # Pong/Done with state
- self.__set_state_code(code)
+ self.__set_state_pong(response)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
@@ -401,19 +442,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
return error_retval
def __set_state_online(self, online: bool) -> None:
- self.__state_flags.update(
- keyboard_online=online,
- mouse_online=online,
- )
-
- def __set_state_code(self, code: int) -> None:
- self.__state_flags.update(
- keyboard_online=(not (code & 0b00001000)),
- mouse_online=(not (code & 0b00010000)),
- caps=bool(code & 0b00000001),
- scroll=bool(code & 0b00000010),
- num=bool(code & 0b00000100),
- )
+ self.__state_flags.update(online=int(online))
+
+ def __set_state_pong(self, data: bytes) -> None:
+ status = data[1] << 16
+ if len(data) > 4:
+ status |= (data[2] << 8) | data[3]
+ self.__state_flags.update(online=1, status=status)
def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes:
if not self.__noop: