diff options
author | Maxim Devaev <[email protected]> | 2023-06-01 17:07:40 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2023-07-10 03:02:28 +0300 |
commit | 2730b1184012f74968c90b35687b679a0a15b747 (patch) | |
tree | b5ed19fdac91d13339c2d54c9c2f1158e40ee26a /kvmd/plugins | |
parent | ce81c872eaf8422beae9e33ffce133957f26a797 (diff) |
ch9329: using bytes instead of list[int]
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/hid/ch9329/__init__.py | 8 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/chip.py | 12 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/keyboard.py | 4 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/mouse.py | 34 |
4 files changed, 30 insertions, 28 deletions
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index 53ac2342..1a46c10e 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -64,7 +64,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__read_timeout = read_timeout self.__reset_required_event = multiprocessing.Event() - self.__cmd_queue: "multiprocessing.Queue[list]" = multiprocessing.Queue() + self.__cmd_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue() self.__notifier = aiomulti.AioProcessNotifier() self.__state_flags = aiomulti.AioSharedFlags({ @@ -163,7 +163,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst def clear_events(self) -> None: tools.clear_queue(self.__cmd_queue) - def __queue_cmd(self, cmd: list[int], clear: bool=False) -> None: + def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None: if not self.__stop_event.is_set(): if clear: # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между @@ -196,7 +196,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst cmd = self.__cmd_queue.get(timeout=0.1) # get_logger(0).info(f"HID : cmd = {cmd}") except queue.Empty: - self.__process_cmd([]) + self.__process_cmd(b"") else: self.__process_cmd(cmd) except Exception: @@ -204,7 +204,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst get_logger(0).exception("Unexpected error in the HID loop") time.sleep(2) - def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches + def __process_cmd(self, cmd: bytes) -> bool: # pylint: disable=too-many-branches try: led_byte = self.__chip.xfer(cmd) except ChipResponseError as err: diff --git a/kvmd/plugins/hid/ch9329/chip.py b/kvmd/plugins/hid/ch9329/chip.py index 8f038042..d883099b 100644 --- a/kvmd/plugins/hid/ch9329/chip.py +++ b/kvmd/plugins/hid/ch9329/chip.py @@ -34,17 +34,17 @@ class Chip: self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) self.__device_path = device_path - def xfer(self, cmd: list[int]) -> int: + def xfer(self, cmd: bytes) -> int: self.__send(cmd) return self.__recv() - def __send(self, cmd: list[int]) -> None: + def __send(self, cmd: bytes) -> None: # RESET = [0x00,0x0F,0x00] # GET_INFO = [0x00,0x01,0x00] if len(cmd) == 0: - cmd = [0x00, 0x01, 0x00] - cmd = [0x57, 0xAB] + cmd - cmd.append(self.__make_checksum(cmd)) + cmd = b"\x00\x01\x00" + cmd = b"\x57\xAB" + cmd + cmd += self.__make_checksum(cmd).to_bytes() self.__tty.write(serial.to_bytes(cmd)) def __recv(self) -> int: @@ -64,5 +64,5 @@ class Chip: # led_byte (info) response return (data[7] if data[3] == 0x81 else -1) - def __make_checksum(self, cmd: (list[int] | bytes)) -> int: + def __make_checksum(self, cmd: bytes) -> int: return (sum(cmd) % 256) diff --git a/kvmd/plugins/hid/ch9329/keyboard.py b/kvmd/plugins/hid/ch9329/keyboard.py index 26eb7e6a..9aa16d9d 100644 --- a/kvmd/plugins/hid/ch9329/keyboard.py +++ b/kvmd/plugins/hid/ch9329/keyboard.py @@ -46,7 +46,7 @@ class Keyboard: async def get_leds(self) -> dict[str, bool]: return (await self.__leds.get()) - def process_key(self, key: str, state: bool) -> list[int]: + def process_key(self, key: str, state: bool) -> bytes: code = KEYMAP[key].usb.code is_modifier = KEYMAP[key].usb.is_modifier if state: @@ -65,4 +65,4 @@ class Keyboard: ] for (index, code) in enumerate(self.__active_keys): cmd[index + 5] = code - return cmd + return bytes(cmd) diff --git a/kvmd/plugins/hid/ch9329/mouse.py b/kvmd/plugins/hid/ch9329/mouse.py index ea21150a..0a0cfbcc 100644 --- a/kvmd/plugins/hid/ch9329/mouse.py +++ b/kvmd/plugins/hid/ch9329/mouse.py @@ -32,6 +32,8 @@ class Mouse: # pylint: disable=too-many-instance-attributes self.__buttons = 0 self.__to_x = (0, 0) self.__to_y = (0, 0) + self.__delta_x = 0 + self.__delta_y = 0 self.__wheel_y = 0 def set_absolute(self, flag: bool) -> None: @@ -40,7 +42,7 @@ class Mouse: # pylint: disable=too-many-instance-attributes def is_absolute(self) -> bool: return self.__absolute - def process_button(self, button: str, state: bool) -> list[int]: + def process_button(self, button: str, state: bool) -> bytes: code = 0x00 match button: case "left": @@ -60,11 +62,11 @@ class Mouse: # pylint: disable=too-many-instance-attributes self.__buttons &= ~code self.__wheel_y = 0 if not self.__absolute: - return self.__make_relative_cmd() + return self.__make_relative_cmd() else: - return self.__make_absolute_cmd() + return self.__make_absolute_cmd() - def process_move(self, to_x: int, to_y: int) -> list[int]: + def process_move(self, to_x: int, to_y: int) -> bytes: self.__to_x = self.__fix_absolute(to_x) self.__to_y = self.__fix_absolute(to_y) self.__wheel_y = 0 @@ -75,37 +77,37 @@ class Mouse: # pylint: disable=too-many-instance-attributes to_fixed = math.ceil(MouseRange.remap(value, 0, MouseRange.MAX) / 8) return (to_fixed >> 8, to_fixed & 0xFF) - def process_wheel(self, delta_x: int, delta_y: int) -> list[int]: + def process_wheel(self, delta_x: int, delta_y: int) -> bytes: _ = delta_x assert -127 <= delta_y <= 127 self.__wheel_y = (1 if delta_y > 0 else 255) if not self.__absolute: - return self.__make_relative_cmd() + return self.__make_relative_cmd() else: - return self.__make_absolute_cmd() - - def process_relative(self, delta_x: int, delta_y: int) -> list[int]: + return self.__make_absolute_cmd() + + def process_relative(self, delta_x: int, delta_y: int) -> bytes: self.__delta_x = self.__fix_relative(delta_x) self.__delta_y = self.__fix_relative(delta_y) self.__wheel_y = 0 return self.__make_relative_cmd() - def __make_absolute_cmd(self) -> list[int]: - return [ + def __make_absolute_cmd(self) -> bytes: + return bytes([ 0, 0x04, 0x07, 0x02, self.__buttons, self.__to_x[1], self.__to_x[0], self.__to_y[1], self.__to_y[0], self.__wheel_y, - ] - - def __make_relative_cmd(self) -> list[int]: - return [ + ]) + + def __make_relative_cmd(self) -> bytes: + return bytes([ 0, 0x05, 0x05, 0x01, self.__buttons, self.__delta_x, self.__delta_y, self.__wheel_y, - ] + ]) def __fix_relative(self, value: int) -> int: assert -127 <= value <= 127 |