summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid/_mcu
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/hid/_mcu')
-rw-r--r--kvmd/plugins/hid/_mcu/__init__.py46
-rw-r--r--kvmd/plugins/hid/_mcu/gpio.py22
-rw-r--r--kvmd/plugins/hid/_mcu/proto.py18
3 files changed, 43 insertions, 43 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py
index 01411621..53665fb2 100644
--- a/kvmd/plugins/hid/_mcu/__init__.py
+++ b/kvmd/plugins/hid/_mcu/__init__.py
@@ -91,7 +91,7 @@ class _TempRequestError(_RequestError):
# =====
class BasePhyConnection:
- def send(self, request: bytes) -> bytes:
+ def send(self, req: bytes) -> bytes:
raise NotImplementedError
@@ -374,7 +374,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__set_state_online(False)
return False
- def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
+ def __process_request(self, conn: BasePhyConnection, req: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
error_messages: list[str] = []
live_log_errors = False
@@ -384,47 +384,47 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
error_retval = False
while self.__gpio.is_powered() and common_retries and read_retries:
- response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request))
+ resp = (RESPONSE_LEGACY_OK if self.__noop else conn.send(req))
try:
- if len(response) < 4:
+ if len(resp) < 4:
read_retries -= 1
- raise _TempRequestError(f"No response from HID: request={request!r}")
+ raise _TempRequestError(f"No response from HID: request={req!r}")
- if not check_response(response):
- request = REQUEST_REPEAT
+ if not check_response(resp):
+ req = REQUEST_REPEAT
raise _TempRequestError("Invalid response CRC; requesting response again ...")
- code = response[1]
+ code = resp[1]
if code == 0x48: # Request timeout # pylint: disable=no-else-raise
- raise _TempRequestError(f"Got request timeout from HID: request={request!r}")
+ raise _TempRequestError(f"Got request timeout from HID: request={req!r}")
elif code == 0x40: # CRC Error
- raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
+ raise _TempRequestError(f"Got CRC error of request from HID: request={req!r}")
elif code == 0x45: # Unknown command
- raise _PermRequestError(f"HID did not recognize the request={request!r}")
+ raise _PermRequestError(f"HID did not recognize the request={req!r}")
elif code == 0x24: # Rebooted?
raise _PermRequestError("No previous command state inside HID, seems it was rebooted")
elif code == 0x20: # Legacy done
self.__set_state_online(True)
return True
elif code & 0x80: # Pong/Done with state
- self.__set_state_pong(response)
+ self.__set_state_pong(resp)
return True
- raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}")
+ raise _TempRequestError(f"Invalid response from HID: request={req!r}, response=0x{resp!r}")
- except _RequestError as err:
+ except _RequestError as ex:
common_retries -= 1
if live_log_errors:
- logger.error(err.msg)
+ logger.error(ex.msg)
else:
- error_messages.append(err.msg)
+ error_messages.append(ex.msg)
if len(error_messages) > self.__errors_threshold:
for msg in error_messages:
logger.error(msg)
error_messages = []
live_log_errors = True
- if isinstance(err, _PermRequestError):
+ if isinstance(ex, _PermRequestError):
error_retval = True
break
@@ -440,7 +440,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
- logger.error("Can't process HID request due many errors: %r", request)
+ logger.error("Can't process HID request due many errors: %r", req)
return error_retval
def __set_state_online(self, online: bool) -> None:
@@ -449,11 +449,11 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __set_state_busy(self, busy: bool) -> None:
self.__state_flags.update(busy=int(busy))
- def __set_state_pong(self, response: bytes) -> None:
- status = response[1] << 16
- if len(response) > 4:
- status |= (response[2] << 8) | response[3]
- reset_required = (1 if response[1] & 0b01000000 else 0)
+ def __set_state_pong(self, resp: bytes) -> None:
+ status = resp[1] << 16
+ if len(resp) > 4:
+ status |= (resp[2] << 8) | resp[3]
+ reset_required = (1 if resp[1] & 0b01000000 else 0)
self.__state_flags.update(online=1, busy=reset_required, status=status)
if reset_required:
if self.__reset_self:
diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py
index 0afbcc69..ce1d678b 100644
--- a/kvmd/plugins/hid/_mcu/gpio.py
+++ b/kvmd/plugins/hid/_mcu/gpio.py
@@ -47,12 +47,12 @@ class Gpio: # pylint: disable=too-many-instance-attributes
self.__reset_inverted = reset_inverted
self.__reset_delay = reset_delay
- self.__line_request: (gpiod.LineRequest | None) = None
+ self.__line_req: (gpiod.LineRequest | None) = None
self.__last_power: (bool | None) = None
def __enter__(self) -> None:
if self.__power_detect_pin >= 0 or self.__reset_pin >= 0:
- assert self.__line_request is None
+ assert self.__line_req is None
config: dict[int, gpiod.LineSettings] = {}
if self.__power_detect_pin >= 0:
config[self.__power_detect_pin] = gpiod.LineSettings(
@@ -65,7 +65,7 @@ class Gpio: # pylint: disable=too-many-instance-attributes
output_value=gpiod.line.Value(self.__reset_inverted),
)
assert len(config) > 0
- self.__line_request = gpiod.request_lines(
+ self.__line_req = gpiod.request_lines(
self.__device_path,
consumer="kvmd::hid",
config=config,
@@ -78,18 +78,18 @@ class Gpio: # pylint: disable=too-many-instance-attributes
_tb: types.TracebackType,
) -> None:
- if self.__line_request:
+ if self.__line_req:
try:
- self.__line_request.release()
+ self.__line_req.release()
except Exception:
pass
self.__last_power = None
- self.__line_request = None
+ self.__line_req = None
def is_powered(self) -> bool:
if self.__power_detect_pin >= 0:
- assert self.__line_request
- power = bool(self.__line_request.get_value(self.__power_detect_pin).value)
+ assert self.__line_req
+ power = bool(self.__line_req.get_value(self.__power_detect_pin).value)
if power != self.__last_power:
get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power)
self.__last_power = power
@@ -98,11 +98,11 @@ class Gpio: # pylint: disable=too-many-instance-attributes
def reset(self) -> None:
if self.__reset_pin >= 0:
- assert self.__line_request
+ assert self.__line_req
try:
- self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted))
+ self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted))
time.sleep(self.__reset_delay)
finally:
- self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted))
+ self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted))
time.sleep(1)
get_logger(0).info("Reset HID performed")
diff --git a/kvmd/plugins/hid/_mcu/proto.py b/kvmd/plugins/hid/_mcu/proto.py
index 76cd5d09..deaf5c15 100644
--- a/kvmd/plugins/hid/_mcu/proto.py
+++ b/kvmd/plugins/hid/_mcu/proto.py
@@ -184,17 +184,17 @@ class MouseWheelEvent(BaseEvent):
# =====
-def check_response(response: bytes) -> bool:
- assert len(response) in (4, 8), response
- return (bitbang.make_crc16(response[:-2]) == struct.unpack(">H", response[-2:])[0])
+def check_response(resp: bytes) -> bool:
+ assert len(resp) in (4, 8), resp
+ return (bitbang.make_crc16(resp[:-2]) == struct.unpack(">H", resp[-2:])[0])
-def _make_request(command: bytes) -> bytes:
- assert len(command) == 5, command
- request = b"\x33" + command
- request += struct.pack(">H", bitbang.make_crc16(request))
- assert len(request) == 8, request
- return request
+def _make_request(cmd: bytes) -> bytes:
+ assert len(cmd) == 5, cmd
+ req = b"\x33" + cmd
+ req += struct.pack(">H", bitbang.make_crc16(req))
+ assert len(req) == 8, req
+ return req
# =====