summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/ezcoo.py
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
committerMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
commitee3e224e396494cd0d69bb6167087a071a20349c (patch)
tree5becd28570e58a03c6e1e231d0db24c264a73f88 /kvmd/plugins/ugpio/ezcoo.py
parent4b75221e9470b4a009955d7677f16adf8e23e302 (diff)
new typing style
Diffstat (limited to 'kvmd/plugins/ugpio/ezcoo.py')
-rw-r--r--kvmd/plugins/ugpio/ezcoo.py15
1 files changed, 6 insertions, 9 deletions
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py
index ac7ee0f3..4af8e087 100644
--- a/kvmd/plugins/ugpio/ezcoo.py
+++ b/kvmd/plugins/ugpio/ezcoo.py
@@ -26,10 +26,7 @@ import functools
import errno
import time
-from typing import Tuple
-from typing import Dict
from typing import Callable
-from typing import Optional
from typing import Any
import serial
@@ -72,14 +69,14 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
self.__protocol = protocol
self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue()
- self.__channel_queue: "multiprocessing.Queue[Optional[int]]" = multiprocessing.Queue()
- self.__channel: Optional[int] = -1
+ self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue()
+ self.__channel: (int | None) = -1
- self.__proc: Optional[multiprocessing.Process] = None
+ self.__proc: (multiprocessing.Process | None) = None
self.__stop_event = multiprocessing.Event()
@classmethod
- def get_plugin_options(cls) -> Dict:
+ def get_plugin_options(cls) -> dict:
return {
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
"speed": Option(115200, type=valid_tty_speed),
@@ -164,8 +161,8 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
def __get_serial(self) -> serial.Serial:
return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout)
- def __recv_channel(self, tty: serial.Serial, data: bytes) -> Tuple[Optional[int], bytes]:
- channel: Optional[int] = None
+ def __recv_channel(self, tty: serial.Serial, data: bytes) -> tuple[(int | None), bytes]:
+ channel: (int | None) = None
if tty.in_waiting:
data += tty.read_all()
found = re.findall(b"V[0-9a-fA-F]{2}S", data)