summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/tesmart.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/tesmart.py')
-rw-r--r--kvmd/plugins/ugpio/tesmart.py148
1 files changed, 86 insertions, 62 deletions
diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py
index 66d0b60d..e1b6a8e9 100644
--- a/kvmd/plugins/ugpio/tesmart.py
+++ b/kvmd/plugins/ugpio/tesmart.py
@@ -20,35 +20,26 @@
# ========================================================================== #
-import re
-import functools
-import errno
-import time
+import asyncio
from typing import Tuple
from typing import Dict
from typing import Optional
-import serial
-import socket
-import binascii
-
from ...logging import get_logger
+from ... import tools
from ... import aiotools
-from ... import aiomulti
-from ... import aioproc
from ...yamlconf import Option
-from ...validators.basic import valid_number
+from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01
-from ...validators.os import valid_abs_path
-from ...validators.hw import valid_tty_speed
from ...validators.net import valid_ip_or_host
from ...validators.net import valid_port
from . import BaseUserGpioDriver
+from . import GpioDriverOfflineError
# =====
@@ -58,79 +49,112 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
instance_name: str,
notifier: aiotools.AioNotifier,
- tesmart_host: str,
- tesmart_port: int,
- max_ports: int,
-
+ host: str,
+ port: int,
+ timeout: float,
+ send_delay: float,
+ state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
- self.__tesmart_host = tesmart_host
- self.__tesmart_port = tesmart_port
- self.__max_ports = max_ports
- self.__switch_state: Dict[int, bool] = {}
- self.__tes_socket: socket
+ self.__host = host
+ self.__port = port
+ self.__timeout = timeout
+ self.__send_delay = send_delay
+ self.__state_poll = state_poll
+
+ self.__reader: Optional[asyncio.StreamReader] = None
+ self.__writer: Optional[asyncio.StreamWriter] = None
+
+ self.__active: int = -1
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "tesmart_host": Option("192.168.1.10", type=valid_ip_or_host),
- "tesmart_port": Option(5000, type=valid_port),
- "max_ports": Option(8, type=functools.partial(valid_number, min=4, max=16)),
+ "host": Option("", type=valid_ip_or_host),
+ "port": Option(5000, type=valid_port),
+ "timeout": Option(5.0, type=valid_float_f01),
+ "send_delay": Option(1.0, type=valid_float_f0),
+ "state_poll": Option(5.0, type=valid_float_f01),
}
def register_input(self, pin: int, debounce: float) -> None:
- _ = pin
+ if not (0 < pin < 16):
+ raise RuntimeError(f"Unsupported port number: {pin}")
_ = debounce
- def register_output(self, port: int, initial: Optional[bool]) -> None:
- if port <= self.__max_ports:
- self.__switch_state[port] = initial
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ if not (0 < pin < 16):
+ raise RuntimeError(f"Unsupported port number: {pin}")
+ _ = initial
def prepare(self) -> None:
- self.__tes_socket = socket.create_connection((self.__tesmart_host,self.__tesmart_port))
- self.__update_state()
-
- def __update_state(self) -> None:
- for port in self.__switch_state:
- self.__switch_state[port] = False
- selport = self.__get_selected_port()
- if selport in self.__switch_state:
- self.__switch_state[selport] = True
-
- def __get_selected_port(self) -> int:
- retint = self.__send_tesmart_command("1000")
- return retint+1
-
- def __send_tesmart_command(self,tes_cmd: str) -> int:
- full_cmd="AABB03"+tes_cmd+"EE"
- binstr = binascii.unhexlify(full_cmd)
- self.__tes_socket.sendall(binstr)
- retstr=self.__tes_socket.recv(6)
- return int(bytearray(retstr)[4])
-
- async def run(self) -> None:
pass
- def cleanup(self) -> None:
- pass
+ async def run(self) -> None:
+ prev_active = -2
+ while True:
+ try:
+ self.__active = await self.__send_command(b"\x10\x00")
+ except Exception:
+ pass
+ if self.__active != prev_active:
+ await self._notifier.notify()
+ prev_active = self.__active
+ await asyncio.sleep(self.__state_poll)
+
+ async def cleanup(self) -> None:
+ await self.__close_device()
async def read(self, pin: int) -> bool:
- if pin in self.__switch_state:
- return self.__switch_state[pin]
- return False
+ return (self.__active == pin)
async def write(self, pin: int, state: bool) -> None:
- if state == False:
- return
- part_cmd="01"+format(pin,"#04x")[2:4]
- writeret = self.__send_tesmart_command(part_cmd)
- self.__update_state()
+ if state:
+ await self.__send_command(b"\x01%.2x" % (pin - 1))
+ await asyncio.sleep(self.__send_delay) # Slowdown
+
+ # =====
+
+ async def __send_command(self, cmd: bytes) -> int:
+ assert len(cmd) == 2
+ (reader, writer) = await self.__ensure_device()
+ try:
+ writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
+ await writer.drain()
+ return (await reader.readexactly(6))[4]
+ except Exception as err:
+ get_logger(0).error("Can't send command to Tesmart KVM [%s]:%d: %s",
+ self.__host, self.__port, tools.efmt(err))
+ await self.__close_device()
+ raise GpioDriverOfflineError(self)
+
+ async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+ if self.__reader is None or self.__writer is None:
+ try:
+ (reader, writer) = await asyncio.open_connection(self.__host, self.__port)
+ sock = writer.get_extra_info("socket")
+ sock.settimeout(self.__timeout)
+ except Exception as err:
+ get_logger(0).error("Can't connect to Tesmart KVM [%s]:%d: %s",
+ self.__host, self.__port, tools.efmt(err))
+ raise GpioDriverOfflineError(self)
+ else:
+ self.__reader = reader
+ self.__writer = writer
+ return (self.__reader, self.__writer)
+
+ async def __close_device(self) -> None:
+ if self.__writer:
+ await aiotools.close_writer(self.__writer)
+ self.__reader = None
+ self.__writer = None
+ self.__active = -1
# =====
def __str__(self) -> str:
- return f"tesmart({self._instance_name})"
+ return f"Tesmart({self._instance_name})"
__repr__ = __str__