diff options
-rw-r--r-- | PKGBUILD | 1 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/tesmart.py | 59 | ||||
-rw-r--r-- | testenv/requirements.txt | 1 |
3 files changed, 30 insertions, 31 deletions
@@ -44,6 +44,7 @@ depends=( python-passlib python-periphery python-pyserial + python-pyserial-asyncio python-spidev python-setproctitle python-psutil diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py index b96588f6..288f2133 100644 --- a/kvmd/plugins/ugpio/tesmart.py +++ b/kvmd/plugins/ugpio/tesmart.py @@ -21,20 +21,15 @@ import asyncio - -# At present this requires building a package from AUR: -# https://aur.archlinux.org/packages/python-pyserial-asyncio -# https://wiki.archlinux.org/title/Arch_User_Repository#Installing_and_upgrading_packages -import serial_asyncio - import functools -from typing import Tuple from typing import Dict from typing import Callable from typing import Optional from typing import Any +import serial_asyncio + from ...logging import get_logger from ... import tools @@ -61,11 +56,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute instance_name: str, notifier: aiotools.AioNotifier, - mode: int, host: str, port: int, + device_path: str, speed: int, + timeout: float, switch_delay: float, state_poll: float, @@ -73,11 +69,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute super().__init__(instance_name, notifier) - self.__mode = mode self.__host = host self.__port = port + self.__device_path = device_path self.__speed = speed + self.__timeout = timeout self.__switch_delay = switch_delay self.__state_poll = state_poll @@ -90,11 +87,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute @classmethod def get_plugin_options(cls) -> Dict: return { - "mode": Option(1, type=functools.partial(valid_number, min=1, max=2)), - "host": Option("", type=valid_ip_or_host), + "host": Option("", type=valid_ip_or_host, if_empty=""), "port": Option(5000, type=valid_port), - "device": Option("", type=valid_abs_path, unpack_as="device_path"), + + "device": Option("", type=valid_abs_path, only_if="!host", unpack_as="device_path"), "speed": Option(9600, type=valid_tty_speed), + "timeout": Option(5.0, type=valid_float_f01), "switch_delay": Option(1.0, type=valid_float_f0), "state_poll": Option(10.0, type=valid_float_f01), @@ -125,7 +123,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute async def write(self, pin: str, state: bool) -> None: # Switch input source command uses 1-based numbering (0x01->PC1...0x10->PC16) - channel = int(pin)+1 + channel = int(pin) + 1 assert 1 <= channel <= 16 if state: await self.__send_command("{:c}{:c}".format(1, channel).encode()) @@ -136,49 +134,48 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute async def __send_command(self, cmd: bytes) -> int: assert len(cmd) == 2 - (reader, writer) = await self.__ensure_device() + await self.__ensure_device() + assert self.__reader is not None + assert self.__writer is not None try: - writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd)) - await asyncio.wait_for(writer.drain(), timeout=self.__timeout) - return (await asyncio.wait_for(reader.readexactly(6), timeout=self.__timeout))[4] + self.__writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd)) + await asyncio.wait_for(self.__writer.drain(), timeout=self.__timeout) + return (await asyncio.wait_for(self.__reader.readexactly(6), timeout=self.__timeout))[4] except Exception as err: get_logger(0).error("Can't send command to TESmart KVM [%s]:%d: %s", self.__host, self.__port, tools.efmt(err)) await self.__close_device() raise GpioDriverOfflineError(self) - async def __ensure_device_tcpip(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + async def __ensure_device(self) -> None: + if self.__reader is None or self.__writer is None: + if self.__host: + await self.__ensure_device_net() + else: + await self.__ensure_device_serial() + + async def __ensure_device_net(self) -> None: try: - (reader, writer) = await asyncio.wait_for( + (self.__reader, self.__writer) = await asyncio.wait_for( asyncio.open_connection(self.__host, self.__port), timeout=self.__timeout, ) - return (reader, writer) except Exception as err: get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s", self.__host, self.__port, tools.efmt(err)) raise GpioDriverOfflineError(self) - async def __ensure_device_serial(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + async def __ensure_device_serial(self) -> None: try: - (reader, writer) = await asyncio.wait_for( + (self.__reader, self.__writer) = await asyncio.wait_for( serial_asyncio.open_serial_connection(url=self.__device_path, baudrate=self.__speed), timeout=self.__timeout, ) - return (reader, writer) except Exception as err: get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s", self.__device_path, self.__speed, tools.efmt(err)) raise GpioDriverOfflineError(self) - async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: - if self.__reader is None or self.__writer is None: - if self.__mode == 1: - (self.__reader, self.__writer) = await self.__ensure_devicee_tcpip() - elif self.__mode == 2: - (self.__reader, self.__writer) = await self.__ensure_device_serial() - return (self.__reader, self.__writer) - async def __close_device(self) -> None: if self.__writer: await aiotools.close_writer(self.__writer) diff --git a/testenv/requirements.txt b/testenv/requirements.txt index 69085a65..b35a712a 100644 --- a/testenv/requirements.txt +++ b/testenv/requirements.txt @@ -1,3 +1,4 @@ +pyserial-asyncio pyghmi spidev pyrad |