diff options
author | Maxim Devaev <[email protected]> | 2024-09-18 04:37:43 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-09-18 04:37:43 +0300 |
commit | 7a53f1445619fc471c2823e7081de8b6039b938e (patch) | |
tree | 961dd0072cc976504fe4570743d801c79512e9a6 /kvmd/plugins/hid/_mcu | |
parent | 45270a09d7b5076bac96887a1e36d752882e3adf (diff) |
refactoring
Diffstat (limited to 'kvmd/plugins/hid/_mcu')
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 46 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/gpio.py | 22 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/proto.py | 18 |
3 files changed, 43 insertions, 43 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index 01411621..53665fb2 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -91,7 +91,7 @@ class _TempRequestError(_RequestError): # ===== class BasePhyConnection: - def send(self, request: bytes) -> bytes: + def send(self, req: bytes) -> bytes: raise NotImplementedError @@ -374,7 +374,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__set_state_online(False) return False - def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches + def __process_request(self, conn: BasePhyConnection, req: bytes) -> bool: # pylint: disable=too-many-branches logger = get_logger() error_messages: list[str] = [] live_log_errors = False @@ -384,47 +384,47 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- error_retval = False while self.__gpio.is_powered() and common_retries and read_retries: - response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request)) + resp = (RESPONSE_LEGACY_OK if self.__noop else conn.send(req)) try: - if len(response) < 4: + if len(resp) < 4: read_retries -= 1 - raise _TempRequestError(f"No response from HID: request={request!r}") + raise _TempRequestError(f"No response from HID: request={req!r}") - if not check_response(response): - request = REQUEST_REPEAT + if not check_response(resp): + req = REQUEST_REPEAT raise _TempRequestError("Invalid response CRC; requesting response again ...") - code = response[1] + code = resp[1] if code == 0x48: # Request timeout # pylint: disable=no-else-raise - raise _TempRequestError(f"Got request timeout from HID: request={request!r}") + raise _TempRequestError(f"Got request timeout from HID: request={req!r}") elif code == 0x40: # CRC Error - raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}") + raise _TempRequestError(f"Got CRC error of request from HID: request={req!r}") elif code == 0x45: # Unknown command - raise _PermRequestError(f"HID did not recognize the request={request!r}") + raise _PermRequestError(f"HID did not recognize the request={req!r}") elif code == 0x24: # Rebooted? raise _PermRequestError("No previous command state inside HID, seems it was rebooted") elif code == 0x20: # Legacy done self.__set_state_online(True) return True elif code & 0x80: # Pong/Done with state - self.__set_state_pong(response) + self.__set_state_pong(resp) return True - raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}") + raise _TempRequestError(f"Invalid response from HID: request={req!r}, response=0x{resp!r}") - except _RequestError as err: + except _RequestError as ex: common_retries -= 1 if live_log_errors: - logger.error(err.msg) + logger.error(ex.msg) else: - error_messages.append(err.msg) + error_messages.append(ex.msg) if len(error_messages) > self.__errors_threshold: for msg in error_messages: logger.error(msg) error_messages = [] live_log_errors = True - if isinstance(err, _PermRequestError): + if isinstance(ex, _PermRequestError): error_retval = True break @@ -440,7 +440,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- for msg in error_messages: logger.error(msg) if not (common_retries and read_retries): - logger.error("Can't process HID request due many errors: %r", request) + logger.error("Can't process HID request due many errors: %r", req) return error_retval def __set_state_online(self, online: bool) -> None: @@ -449,11 +449,11 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def __set_state_busy(self, busy: bool) -> None: self.__state_flags.update(busy=int(busy)) - def __set_state_pong(self, response: bytes) -> None: - status = response[1] << 16 - if len(response) > 4: - status |= (response[2] << 8) | response[3] - reset_required = (1 if response[1] & 0b01000000 else 0) + def __set_state_pong(self, resp: bytes) -> None: + status = resp[1] << 16 + if len(resp) > 4: + status |= (resp[2] << 8) | resp[3] + reset_required = (1 if resp[1] & 0b01000000 else 0) self.__state_flags.update(online=1, busy=reset_required, status=status) if reset_required: if self.__reset_self: diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py index 0afbcc69..ce1d678b 100644 --- a/kvmd/plugins/hid/_mcu/gpio.py +++ b/kvmd/plugins/hid/_mcu/gpio.py @@ -47,12 +47,12 @@ class Gpio: # pylint: disable=too-many-instance-attributes self.__reset_inverted = reset_inverted self.__reset_delay = reset_delay - self.__line_request: (gpiod.LineRequest | None) = None + self.__line_req: (gpiod.LineRequest | None) = None self.__last_power: (bool | None) = None def __enter__(self) -> None: if self.__power_detect_pin >= 0 or self.__reset_pin >= 0: - assert self.__line_request is None + assert self.__line_req is None config: dict[int, gpiod.LineSettings] = {} if self.__power_detect_pin >= 0: config[self.__power_detect_pin] = gpiod.LineSettings( @@ -65,7 +65,7 @@ class Gpio: # pylint: disable=too-many-instance-attributes output_value=gpiod.line.Value(self.__reset_inverted), ) assert len(config) > 0 - self.__line_request = gpiod.request_lines( + self.__line_req = gpiod.request_lines( self.__device_path, consumer="kvmd::hid", config=config, @@ -78,18 +78,18 @@ class Gpio: # pylint: disable=too-many-instance-attributes _tb: types.TracebackType, ) -> None: - if self.__line_request: + if self.__line_req: try: - self.__line_request.release() + self.__line_req.release() except Exception: pass self.__last_power = None - self.__line_request = None + self.__line_req = None def is_powered(self) -> bool: if self.__power_detect_pin >= 0: - assert self.__line_request - power = bool(self.__line_request.get_value(self.__power_detect_pin).value) + assert self.__line_req + power = bool(self.__line_req.get_value(self.__power_detect_pin).value) if power != self.__last_power: get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power) self.__last_power = power @@ -98,11 +98,11 @@ class Gpio: # pylint: disable=too-many-instance-attributes def reset(self) -> None: if self.__reset_pin >= 0: - assert self.__line_request + assert self.__line_req try: - self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted)) + self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted)) time.sleep(self.__reset_delay) finally: - self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted)) + self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted)) time.sleep(1) get_logger(0).info("Reset HID performed") diff --git a/kvmd/plugins/hid/_mcu/proto.py b/kvmd/plugins/hid/_mcu/proto.py index 76cd5d09..deaf5c15 100644 --- a/kvmd/plugins/hid/_mcu/proto.py +++ b/kvmd/plugins/hid/_mcu/proto.py @@ -184,17 +184,17 @@ class MouseWheelEvent(BaseEvent): # ===== -def check_response(response: bytes) -> bool: - assert len(response) in (4, 8), response - return (bitbang.make_crc16(response[:-2]) == struct.unpack(">H", response[-2:])[0]) +def check_response(resp: bytes) -> bool: + assert len(resp) in (4, 8), resp + return (bitbang.make_crc16(resp[:-2]) == struct.unpack(">H", resp[-2:])[0]) -def _make_request(command: bytes) -> bytes: - assert len(command) == 5, command - request = b"\x33" + command - request += struct.pack(">H", bitbang.make_crc16(request)) - assert len(request) == 8, request - return request +def _make_request(cmd: bytes) -> bytes: + assert len(cmd) == 5, cmd + req = b"\x33" + cmd + req += struct.pack(">H", bitbang.make_crc16(req)) + assert len(req) == 8, req + return req # ===== |