summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-07-21 01:44:44 +0300
committerMaxim Devaev <[email protected]>2022-07-21 01:44:44 +0300
commit508a6e9b5892cb1d551d86143c2340fa53f4bf47 (patch)
tree1dd5a0ff9da6bf19606c5b008a989cfad2b0704e
parent42c85021f75c9d3f9ada188faa2ba53d5ccac605 (diff)
refactoring
-rw-r--r--PKGBUILD1
-rw-r--r--kvmd/plugins/ugpio/tesmart.py59
-rw-r--r--testenv/requirements.txt1
3 files changed, 30 insertions, 31 deletions
diff --git a/PKGBUILD b/PKGBUILD
index 6adc0690..b84b07da 100644
--- a/PKGBUILD
+++ b/PKGBUILD
@@ -44,6 +44,7 @@ depends=(
python-passlib
python-periphery
python-pyserial
+ python-pyserial-asyncio
python-spidev
python-setproctitle
python-psutil
diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py
index b96588f6..288f2133 100644
--- a/kvmd/plugins/ugpio/tesmart.py
+++ b/kvmd/plugins/ugpio/tesmart.py
@@ -21,20 +21,15 @@
import asyncio
-
-# At present this requires building a package from AUR:
-# https://aur.archlinux.org/packages/python-pyserial-asyncio
-# https://wiki.archlinux.org/title/Arch_User_Repository#Installing_and_upgrading_packages
-import serial_asyncio
-
import functools
-from typing import Tuple
from typing import Dict
from typing import Callable
from typing import Optional
from typing import Any
+import serial_asyncio
+
from ...logging import get_logger
from ... import tools
@@ -61,11 +56,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
instance_name: str,
notifier: aiotools.AioNotifier,
- mode: int,
host: str,
port: int,
+
device_path: str,
speed: int,
+
timeout: float,
switch_delay: float,
state_poll: float,
@@ -73,11 +69,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
super().__init__(instance_name, notifier)
- self.__mode = mode
self.__host = host
self.__port = port
+
self.__device_path = device_path
self.__speed = speed
+
self.__timeout = timeout
self.__switch_delay = switch_delay
self.__state_poll = state_poll
@@ -90,11 +87,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "mode": Option(1, type=functools.partial(valid_number, min=1, max=2)),
- "host": Option("", type=valid_ip_or_host),
+ "host": Option("", type=valid_ip_or_host, if_empty=""),
"port": Option(5000, type=valid_port),
- "device": Option("", type=valid_abs_path, unpack_as="device_path"),
+
+ "device": Option("", type=valid_abs_path, only_if="!host", unpack_as="device_path"),
"speed": Option(9600, type=valid_tty_speed),
+
"timeout": Option(5.0, type=valid_float_f01),
"switch_delay": Option(1.0, type=valid_float_f0),
"state_poll": Option(10.0, type=valid_float_f01),
@@ -125,7 +123,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
async def write(self, pin: str, state: bool) -> None:
# Switch input source command uses 1-based numbering (0x01->PC1...0x10->PC16)
- channel = int(pin)+1
+ channel = int(pin) + 1
assert 1 <= channel <= 16
if state:
await self.__send_command("{:c}{:c}".format(1, channel).encode())
@@ -136,49 +134,48 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
async def __send_command(self, cmd: bytes) -> int:
assert len(cmd) == 2
- (reader, writer) = await self.__ensure_device()
+ await self.__ensure_device()
+ assert self.__reader is not None
+ assert self.__writer is not None
try:
- writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
- await asyncio.wait_for(writer.drain(), timeout=self.__timeout)
- return (await asyncio.wait_for(reader.readexactly(6), timeout=self.__timeout))[4]
+ self.__writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
+ await asyncio.wait_for(self.__writer.drain(), timeout=self.__timeout)
+ return (await asyncio.wait_for(self.__reader.readexactly(6), timeout=self.__timeout))[4]
except Exception as err:
get_logger(0).error("Can't send command to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
await self.__close_device()
raise GpioDriverOfflineError(self)
- async def __ensure_device_tcpip(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+ async def __ensure_device(self) -> None:
+ if self.__reader is None or self.__writer is None:
+ if self.__host:
+ await self.__ensure_device_net()
+ else:
+ await self.__ensure_device_serial()
+
+ async def __ensure_device_net(self) -> None:
try:
- (reader, writer) = await asyncio.wait_for(
+ (self.__reader, self.__writer) = await asyncio.wait_for(
asyncio.open_connection(self.__host, self.__port),
timeout=self.__timeout,
)
- return (reader, writer)
except Exception as err:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
raise GpioDriverOfflineError(self)
- async def __ensure_device_serial(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+ async def __ensure_device_serial(self) -> None:
try:
- (reader, writer) = await asyncio.wait_for(
+ (self.__reader, self.__writer) = await asyncio.wait_for(
serial_asyncio.open_serial_connection(url=self.__device_path, baudrate=self.__speed),
timeout=self.__timeout,
)
- return (reader, writer)
except Exception as err:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__device_path, self.__speed, tools.efmt(err))
raise GpioDriverOfflineError(self)
- async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
- if self.__reader is None or self.__writer is None:
- if self.__mode == 1:
- (self.__reader, self.__writer) = await self.__ensure_devicee_tcpip()
- elif self.__mode == 2:
- (self.__reader, self.__writer) = await self.__ensure_device_serial()
- return (self.__reader, self.__writer)
-
async def __close_device(self) -> None:
if self.__writer:
await aiotools.close_writer(self.__writer)
diff --git a/testenv/requirements.txt b/testenv/requirements.txt
index 69085a65..b35a712a 100644
--- a/testenv/requirements.txt
+++ b/testenv/requirements.txt
@@ -1,3 +1,4 @@
+pyserial-asyncio
pyghmi
spidev
pyrad