diff options
author | Maxim Devaev <[email protected]> | 2023-03-07 19:50:45 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2023-03-07 19:50:45 +0200 |
commit | 002031baf15d328dad764a620e1c83c01e7a55a6 (patch) | |
tree | b6a48caf3f7027c5d19e0b55018f78e4f4211372 | |
parent | d3eba4564431d5f0db9a69572f5406b5c035aadb (diff) |
rewritten anelpwr plugin
-rw-r--r-- | kvmd/plugins/ugpio/anelpwr.py | 154 |
1 files changed, 102 insertions, 52 deletions
diff --git a/kvmd/plugins/ugpio/anelpwr.py b/kvmd/plugins/ugpio/anelpwr.py index 76b1d009..e07321df 100644 --- a/kvmd/plugins/ugpio/anelpwr.py +++ b/kvmd/plugins/ugpio/anelpwr.py @@ -19,102 +19,152 @@ # # # ========================================================================== # -import requests -import aiohttp + +import asyncio import functools from typing import Callable from typing import Any +import aiohttp + from ...logging import get_logger from ... import tools from ... import aiotools +from ... import htclient from ...yamlconf import Option -from ...validators.hw import valid_number from ...validators.basic import valid_stripped_string_not_empty +from ...validators.basic import valid_bool +from ...validators.basic import valid_number +from ...validators.basic import valid_float_f01 -from . import UserGpioModes from . import BaseUserGpioDriver from . import GpioDriverOfflineError # ===== class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes - def __init__( # pylint: disable=super-init-not-called + def __init__( self, instance_name: str, notifier: aiotools.AioNotifier, - host: str, - port: str, + + url: str, + verify: bool, user: str, - password: str, - + passwd: str, + state_poll: float, + timeout: float, ) -> None: super().__init__(instance_name, notifier) - self.__host = host - self.__port = port + self.__url = url + self.__verify = verify self.__user = user - self.__password = password + self.__passwd = passwd + self.__state_poll = state_poll + self.__timeout = timeout + + self.__initial: dict[str, (bool | None)] = {} + + self.__state: dict[str, (bool | None)] = {} + self.__update_notifier = aiotools.AioNotifier() + + self.__http_session: (aiohttp.ClientSession | None) = None @classmethod def get_plugin_options(cls) -> dict[str, Option]: return { - "host": Option([], type=valid_stripped_string_not_empty), - "port": Option([], type=valid_number), - "user": Option([], type=valid_stripped_string_not_empty), - "password": Option([], type=valid_stripped_string_not_empty), + "url": Option("", type=valid_stripped_string_not_empty), + "verify": Option(True, type=valid_bool), + "user": Option(""), + "passwd": Option(""), + "state_poll": Option(5.0, type=valid_float_f01), + "timeout": Option(5.0, type=valid_float_f01), } @classmethod - def get_modes(cls) -> set[str]: - return set(UserGpioModes.ALL) - - @classmethod def get_pin_validator(cls) -> Callable[[Any], Any]: - return functools.partial(valid_number, min=1,max=8) + return functools.partial(valid_number, min=0, max=7, name="ANELPWR channel") + + def register_input(self, pin: str, debounce: float) -> None: + _ = debounce + self.__state[pin] = None + + def register_output(self, pin: str, initial: (bool | None)) -> None: + self.__initial[pin] = initial + self.__state[pin] = None + + def prepare(self) -> None: + async def inner_prepare() -> None: + await asyncio.gather(*[ + self.write(pin, state) + for (pin, state) in self.__initial.items() + if state is not None + ], return_exceptions=True) + aiotools.run_sync(inner_prepare()) + + async def run(self) -> None: + prev_state: (dict | None) = None + while True: + session = self.__ensure_http_session() + try: + async with session.get(f"{self.__url}/strg.cfg") as response: + htclient.raise_not_200(response) + parts = (await response.text()).split(";") + for pin in self.__state: + self.__state[pin] = (parts[1 + int(pin) * 5] == "1") + except Exception as err: + get_logger().error("Failed ANELPWR bulk GET request: %s", tools.efmt(err)) + self.__state = dict.fromkeys(self.__state, None) + if self.__state != prev_state: + self._notifier.notify() + prev_state = self.__state + await self.__update_notifier.wait(self.__state_poll) + + async def cleanup(self) -> None: + if self.__http_session: + await self.__http_session.close() + self.__http_session = None async def read(self, pin: str) -> bool: - try: - #status = 0 - #body = "" - async with aiohttp.ClientSession() as session: - url = f"http://{self.__host}:{self.__port}/strg.cfg" - async with session.get(url,auth=aiohttp.BasicAuth(self.__user,self.__password)) as resp: - body = await resp.text() - if ( resp.status != 200 ): - get_logger(0).error(f"http get returned {resp.status} form {self.__host}") - raise GpioDriverOfflineError(self) - - return '1' == body.split(';')[1 + (int(pin) - 1) * 5] - - except Exception as e: - get_logger(0).error(e) + if self.__state[pin] is None: raise GpioDriverOfflineError(self) + return self.__state[pin] # type: ignore async def write(self, pin: str, state: bool) -> None: - _ = pin - if state: - onoff = f"F{int(pin)-1}=1" - else: - onoff = f"F{int(pin)-1}=0" + session = self.__ensure_http_session() try: - async with aiohttp.ClientSession() as session: - url = f'http://{self.__host}:{self.__port}/ctrl.htm' - headers={'Content-Type':'text/plain'} - auth=aiohttp.BasicAuth(self.__user,self.__password) - async with session.post(url,auth=auth,headers=headers,data=onoff) as resp: - await resp.text() - if 200 != resp.status: - raise GpioDriverOfflineError(self) - - except Exception as e: - get_logger(0).error(e) + async with session.post( + url=f"{self.__url}//ctrl.htm", + data=f"F{pin}={int(state)}", + headers={"Content-Type": "text/plain"}, + ) as response: + htclient.raise_not_200(response) + except Exception as err: + get_logger().error("Failed ANELPWR POST request to pin %s: %s", pin, tools.efmt(err)) raise GpioDriverOfflineError(self) + else: + self.__update_notifier.notify() + + def __ensure_http_session(self) -> aiohttp.ClientSession: + if not self.__http_session: + kwargs: dict = { + "headers": { + "User-Agent": htclient.make_user_agent("KVMD"), + }, + "timeout": aiohttp.ClientTimeout(total=self.__timeout), + } + if self.__user: + kwargs["auth"] = aiohttp.BasicAuth(self.__user, self.__passwd) + if not self.__verify: + kwargs["connector"] = aiohttp.TCPConnector(ssl=False) + self.__http_session = aiohttp.ClientSession(**kwargs) + return self.__http_session def __str__(self) -> str: return f"ANELPWR({self._instance_name})" |