summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-05-16 05:57:08 +0300
committerDevaev Maxim <[email protected]>2021-05-16 05:57:08 +0300
commit8db0ab20e0fda9796e7b3607a551e90317dfba71 (patch)
tree12db6f56df3b0b495b2d54eb16d15f2db2574bad
parent1fc8434f0a6c7c8730c8ee5a4752fb075e190fcd (diff)
tesmart rewrite
-rw-r--r--kvmd/aiotools.py13
-rw-r--r--kvmd/apps/kvmd/ugpio.py2
-rw-r--r--kvmd/apps/vnc/rfb/stream.py16
-rw-r--r--kvmd/apps/vnc/server.py3
-rw-r--r--kvmd/plugins/ugpio/__init__.py2
-rw-r--r--kvmd/plugins/ugpio/ezcoo.py2
-rw-r--r--kvmd/plugins/ugpio/gpio.py2
-rw-r--r--kvmd/plugins/ugpio/hidrelay.py2
-rw-r--r--kvmd/plugins/ugpio/ipmi.py2
-rw-r--r--kvmd/plugins/ugpio/otgbind.py2
-rw-r--r--kvmd/plugins/ugpio/tesmart.py148
11 files changed, 110 insertions, 84 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index e18ce935..e14060e1 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -89,6 +89,19 @@ async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.
# =====
+async def close_writer(writer: asyncio.StreamWriter) -> bool:
+ closing = writer.is_closing()
+ if not closing:
+ writer.transport.abort() # type: ignore
+ writer.close()
+ try:
+ await writer.wait_closed()
+ except Exception:
+ pass
+ return (not closing)
+
+
+# =====
class AioNotifier:
def __init__(self) -> None:
self.__queue: "asyncio.Queue[None]" = asyncio.Queue()
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index d77a81c6..ca04b31f 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -295,7 +295,7 @@ class UserGpio:
async def cleanup(self) -> None:
for driver in self.__drivers.values():
try:
- driver.cleanup()
+ await driver.cleanup()
except Exception:
get_logger().exception("Can't cleanup driver %s", driver)
diff --git a/kvmd/apps/vnc/rfb/stream.py b/kvmd/apps/vnc/rfb/stream.py
index 3f2d38b8..235a32e4 100644
--- a/kvmd/apps/vnc/rfb/stream.py
+++ b/kvmd/apps/vnc/rfb/stream.py
@@ -27,6 +27,8 @@ import struct
from typing import Tuple
from typing import Any
+from .... import aiotools
+
from .errors import RfbConnectionError
@@ -35,18 +37,6 @@ def rfb_format_remote(writer: asyncio.StreamWriter) -> str:
return "[%s]:%d" % (writer.transport.get_extra_info("peername")[:2])
-async def rfb_close_writer(writer: asyncio.StreamWriter) -> bool:
- closing = writer.is_closing()
- if not closing:
- writer.transport.abort() # type: ignore
- writer.close()
- try:
- await writer.wait_closed()
- except Exception:
- pass
- return (not closing)
-
-
class RfbClientStream:
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
self.__reader = reader
@@ -145,4 +135,4 @@ class RfbClientStream:
self.__writer = ssl_writer
async def _close(self) -> None:
- await rfb_close_writer(self.__writer)
+ await aiotools.close_writer(self.__writer)
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index ca391292..57c111ef 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -55,7 +55,6 @@ from ... import aiotools
from .rfb import RfbClient
from .rfb.stream import rfb_format_remote
-from .rfb.stream import rfb_close_writer
from .rfb.errors import RfbError
from .vncauth import VncAuthKvmdCredentials
@@ -487,7 +486,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
except Exception:
logger.exception("[entry] %s: Unhandled exception in client task", remote)
finally:
- if (await rfb_close_writer(writer)):
+ if (await aiotools.close_writer(writer)):
logger.info("[entry] %s: Connection is closed in an emergency", remote)
self.__handle_client = handle_client
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py
index 2a172c63..e2f5943e 100644
--- a/kvmd/plugins/ugpio/__init__.py
+++ b/kvmd/plugins/ugpio/__init__.py
@@ -86,7 +86,7 @@ class BaseUserGpioDriver(BasePlugin):
async def run(self) -> None:
raise NotImplementedError
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
raise NotImplementedError
async def read(self, pin: int) -> bool:
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py
index 3aa92d07..b1f33258 100644
--- a/kvmd/plugins/ugpio/ezcoo.py
+++ b/kvmd/plugins/ugpio/ezcoo.py
@@ -105,7 +105,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
self.__channel = channel
await self._notifier.notify()
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
if self.__proc is not None:
if self.__proc.is_alive():
get_logger(0).info("Stopping %s daemon ...", self)
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 646809ae..28f0d5cb 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -88,7 +88,7 @@ class Plugin(BaseUserGpioDriver):
assert self.__reader
await self.__reader.poll()
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
if self.__chip:
try:
self.__chip.close()
diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py
index 119c4ee5..7e18c72a 100644
--- a/kvmd/plugins/ugpio/hidrelay.py
+++ b/kvmd/plugins/ugpio/hidrelay.py
@@ -108,7 +108,7 @@ class Plugin(BaseUserGpioDriver):
prev_raw = raw
await asyncio.sleep(self.__state_poll)
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
self.__reset_pins()
self.__close_device()
self.__stop = True
diff --git a/kvmd/plugins/ugpio/ipmi.py b/kvmd/plugins/ugpio/ipmi.py
index cdb317de..9ff33da4 100644
--- a/kvmd/plugins/ugpio/ipmi.py
+++ b/kvmd/plugins/ugpio/ipmi.py
@@ -131,7 +131,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
prev = new
await asyncio.sleep(self.__state_poll)
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
pass
async def read(self, pin: int) -> bool:
diff --git a/kvmd/plugins/ugpio/otgbind.py b/kvmd/plugins/ugpio/otgbind.py
index dbf3b421..5d80e516 100644
--- a/kvmd/plugins/ugpio/otgbind.py
+++ b/kvmd/plugins/ugpio/otgbind.py
@@ -97,7 +97,7 @@ class Plugin(BaseUserGpioDriver):
except Exception:
logger.exception("Unexpected OTG-bind watcher error")
- def cleanup(self) -> None:
+ async def cleanup(self) -> None:
pass
async def read(self, pin: int) -> bool:
diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py
index 66d0b60d..e1b6a8e9 100644
--- a/kvmd/plugins/ugpio/tesmart.py
+++ b/kvmd/plugins/ugpio/tesmart.py
@@ -20,35 +20,26 @@
# ========================================================================== #
-import re
-import functools
-import errno
-import time
+import asyncio
from typing import Tuple
from typing import Dict
from typing import Optional
-import serial
-import socket
-import binascii
-
from ...logging import get_logger
+from ... import tools
from ... import aiotools
-from ... import aiomulti
-from ... import aioproc
from ...yamlconf import Option
-from ...validators.basic import valid_number
+from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01
-from ...validators.os import valid_abs_path
-from ...validators.hw import valid_tty_speed
from ...validators.net import valid_ip_or_host
from ...validators.net import valid_port
from . import BaseUserGpioDriver
+from . import GpioDriverOfflineError
# =====
@@ -58,79 +49,112 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
instance_name: str,
notifier: aiotools.AioNotifier,
- tesmart_host: str,
- tesmart_port: int,
- max_ports: int,
-
+ host: str,
+ port: int,
+ timeout: float,
+ send_delay: float,
+ state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
- self.__tesmart_host = tesmart_host
- self.__tesmart_port = tesmart_port
- self.__max_ports = max_ports
- self.__switch_state: Dict[int, bool] = {}
- self.__tes_socket: socket
+ self.__host = host
+ self.__port = port
+ self.__timeout = timeout
+ self.__send_delay = send_delay
+ self.__state_poll = state_poll
+
+ self.__reader: Optional[asyncio.StreamReader] = None
+ self.__writer: Optional[asyncio.StreamWriter] = None
+
+ self.__active: int = -1
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "tesmart_host": Option("192.168.1.10", type=valid_ip_or_host),
- "tesmart_port": Option(5000, type=valid_port),
- "max_ports": Option(8, type=functools.partial(valid_number, min=4, max=16)),
+ "host": Option("", type=valid_ip_or_host),
+ "port": Option(5000, type=valid_port),
+ "timeout": Option(5.0, type=valid_float_f01),
+ "send_delay": Option(1.0, type=valid_float_f0),
+ "state_poll": Option(5.0, type=valid_float_f01),
}
def register_input(self, pin: int, debounce: float) -> None:
- _ = pin
+ if not (0 < pin < 16):
+ raise RuntimeError(f"Unsupported port number: {pin}")
_ = debounce
- def register_output(self, port: int, initial: Optional[bool]) -> None:
- if port <= self.__max_ports:
- self.__switch_state[port] = initial
+ def register_output(self, pin: int, initial: Optional[bool]) -> None:
+ if not (0 < pin < 16):
+ raise RuntimeError(f"Unsupported port number: {pin}")
+ _ = initial
def prepare(self) -> None:
- self.__tes_socket = socket.create_connection((self.__tesmart_host,self.__tesmart_port))
- self.__update_state()
-
- def __update_state(self) -> None:
- for port in self.__switch_state:
- self.__switch_state[port] = False
- selport = self.__get_selected_port()
- if selport in self.__switch_state:
- self.__switch_state[selport] = True
-
- def __get_selected_port(self) -> int:
- retint = self.__send_tesmart_command("1000")
- return retint+1
-
- def __send_tesmart_command(self,tes_cmd: str) -> int:
- full_cmd="AABB03"+tes_cmd+"EE"
- binstr = binascii.unhexlify(full_cmd)
- self.__tes_socket.sendall(binstr)
- retstr=self.__tes_socket.recv(6)
- return int(bytearray(retstr)[4])
-
- async def run(self) -> None:
pass
- def cleanup(self) -> None:
- pass
+ async def run(self) -> None:
+ prev_active = -2
+ while True:
+ try:
+ self.__active = await self.__send_command(b"\x10\x00")
+ except Exception:
+ pass
+ if self.__active != prev_active:
+ await self._notifier.notify()
+ prev_active = self.__active
+ await asyncio.sleep(self.__state_poll)
+
+ async def cleanup(self) -> None:
+ await self.__close_device()
async def read(self, pin: int) -> bool:
- if pin in self.__switch_state:
- return self.__switch_state[pin]
- return False
+ return (self.__active == pin)
async def write(self, pin: int, state: bool) -> None:
- if state == False:
- return
- part_cmd="01"+format(pin,"#04x")[2:4]
- writeret = self.__send_tesmart_command(part_cmd)
- self.__update_state()
+ if state:
+ await self.__send_command(b"\x01%.2x" % (pin - 1))
+ await asyncio.sleep(self.__send_delay) # Slowdown
+
+ # =====
+
+ async def __send_command(self, cmd: bytes) -> int:
+ assert len(cmd) == 2
+ (reader, writer) = await self.__ensure_device()
+ try:
+ writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
+ await writer.drain()
+ return (await reader.readexactly(6))[4]
+ except Exception as err:
+ get_logger(0).error("Can't send command to Tesmart KVM [%s]:%d: %s",
+ self.__host, self.__port, tools.efmt(err))
+ await self.__close_device()
+ raise GpioDriverOfflineError(self)
+
+ async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
+ if self.__reader is None or self.__writer is None:
+ try:
+ (reader, writer) = await asyncio.open_connection(self.__host, self.__port)
+ sock = writer.get_extra_info("socket")
+ sock.settimeout(self.__timeout)
+ except Exception as err:
+ get_logger(0).error("Can't connect to Tesmart KVM [%s]:%d: %s",
+ self.__host, self.__port, tools.efmt(err))
+ raise GpioDriverOfflineError(self)
+ else:
+ self.__reader = reader
+ self.__writer = writer
+ return (self.__reader, self.__writer)
+
+ async def __close_device(self) -> None:
+ if self.__writer:
+ await aiotools.close_writer(self.__writer)
+ self.__reader = None
+ self.__writer = None
+ self.__active = -1
# =====
def __str__(self) -> str:
- return f"tesmart({self._instance_name})"
+ return f"Tesmart({self._instance_name})"
__repr__ = __str__