diff options
author | David Shay <[email protected]> | 2021-05-14 13:17:46 -0400 |
---|---|---|
committer | GitHub <[email protected]> | 2021-05-14 20:17:46 +0300 |
commit | 1fc8434f0a6c7c8730c8ee5a4752fb075e190fcd (patch) | |
tree | 9eb220c50aeb2a9ffdd41f016de2c78d6c60e6d7 /kvmd/plugins | |
parent | 9965259f294d27384c9a187a4fcf41a9185926e2 (diff) |
Plugin support for TESMART switch - tesmart.py (#47)
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/ugpio/tesmart.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/kvmd/plugins/ugpio/tesmart.py b/kvmd/plugins/ugpio/tesmart.py new file mode 100644 index 00000000..66d0b60d --- /dev/null +++ b/kvmd/plugins/ugpio/tesmart.py @@ -0,0 +1,136 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import re +import functools +import errno +import time + +from typing import Tuple +from typing import Dict +from typing import Optional + +import serial +import socket +import binascii + +from ...logging import get_logger + +from ... import aiotools +from ... import aiomulti +from ... import aioproc + +from ...yamlconf import Option + +from ...validators.basic import valid_number +from ...validators.basic import valid_float_f01 +from ...validators.os import valid_abs_path +from ...validators.hw import valid_tty_speed +from ...validators.net import valid_ip_or_host +from ...validators.net import valid_port + +from . import BaseUserGpioDriver + + +# ===== +class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes + def __init__( + self, + instance_name: str, + notifier: aiotools.AioNotifier, + + tesmart_host: str, + tesmart_port: int, + max_ports: int, + + ) -> None: + + super().__init__(instance_name, notifier) + + self.__tesmart_host = tesmart_host + self.__tesmart_port = tesmart_port + self.__max_ports = max_ports + self.__switch_state: Dict[int, bool] = {} + self.__tes_socket: socket + + @classmethod + def get_plugin_options(cls) -> Dict: + return { + "tesmart_host": Option("192.168.1.10", type=valid_ip_or_host), + "tesmart_port": Option(5000, type=valid_port), + "max_ports": Option(8, type=functools.partial(valid_number, min=4, max=16)), + } + + def register_input(self, pin: int, debounce: float) -> None: + _ = pin + _ = debounce + + def register_output(self, port: int, initial: Optional[bool]) -> None: + if port <= self.__max_ports: + self.__switch_state[port] = initial + + def prepare(self) -> None: + self.__tes_socket = socket.create_connection((self.__tesmart_host,self.__tesmart_port)) + self.__update_state() + + def __update_state(self) -> None: + for port in self.__switch_state: + self.__switch_state[port] = False + selport = self.__get_selected_port() + if selport in self.__switch_state: + self.__switch_state[selport] = True + + def __get_selected_port(self) -> int: + retint = self.__send_tesmart_command("1000") + return retint+1 + + def __send_tesmart_command(self,tes_cmd: str) -> int: + full_cmd="AABB03"+tes_cmd+"EE" + binstr = binascii.unhexlify(full_cmd) + self.__tes_socket.sendall(binstr) + retstr=self.__tes_socket.recv(6) + return int(bytearray(retstr)[4]) + + async def run(self) -> None: + pass + + def cleanup(self) -> None: + pass + + async def read(self, pin: int) -> bool: + if pin in self.__switch_state: + return self.__switch_state[pin] + return False + + async def write(self, pin: int, state: bool) -> None: + if state == False: + return + part_cmd="01"+format(pin,"#04x")[2:4] + writeret = self.__send_tesmart_command(part_cmd) + self.__update_state() + + # ===== + + def __str__(self) -> str: + return f"tesmart({self._instance_name})" + + __repr__ = __str__ |