From 3cd285c66e9060aa80d399bb1383c6fb84e76a3c Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 28 Jan 2024 09:29:57 +0200 Subject: refactoring of unifi plugin --- kvmd/plugins/ugpio/unifi.py | 344 ++++++++++++++++---------------------------- 1 file changed, 127 insertions(+), 217 deletions(-) diff --git a/kvmd/plugins/ugpio/unifi.py b/kvmd/plugins/ugpio/unifi.py index c4d572e9..67a83e45 100644 --- a/kvmd/plugins/ugpio/unifi.py +++ b/kvmd/plugins/ugpio/unifi.py @@ -21,14 +21,10 @@ import asyncio -import functools -import json from typing import Callable from typing import Any -from multidict import CIMultiDict, CIMultiDictProxy - import aiohttp from ...logging import get_logger @@ -48,17 +44,8 @@ from . import GpioDriverOfflineError # ===== - -def set_poe_mode(pin: str, state: bool, port_override: dict[str, Any]) -> dict[str, Any]: - port_idx = str(port_override["port_idx"]) - if port_idx == pin: - port_override["poe_mode"] = "auto" if state else "off" - return port_override - - -# pylint: disable=too-many-instance-attributes disable=too-many-arguments -class Plugin(BaseUserGpioDriver): - def __init__( +class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments self, instance_name: str, notifier: aiotools.AioNotifier, @@ -68,23 +55,21 @@ class Plugin(BaseUserGpioDriver): user: str, passwd: str, mac: str, - - timeout: float, switch_delay: float, + timeout: float, state_poll: float, ) -> None: super().__init__(instance_name, notifier) self.__url = url + self.__verify = verify self.__user = user self.__passwd = passwd self.__mac = mac - self.__state_poll = state_poll - self.__switch_delay = switch_delay - - self.__initial: dict[str, (bool | None)] = {} + self.__timeout = timeout + self.__state_poll = state_poll self.__state: dict[str, (bool | None)] = {} @@ -93,7 +78,7 @@ class Plugin(BaseUserGpioDriver): self.__update_notifier = aiotools.AioNotifier() - self.__http_session: aiohttp.ClientSession = self.__get_client_session(timeout, verify) + self.__http_session: (aiohttp.ClientSession | None) = None self.__csrf_token: (str | None) = None self.__id: (str | None) = None @@ -102,14 +87,14 @@ class Plugin(BaseUserGpioDriver): @classmethod def get_plugin_options(cls) -> dict[str, Option]: return { - "url": Option("", type=valid_stripped_string_not_empty), - "verify": Option(True, type=valid_bool), - "user": Option(""), - "passwd": Option(""), - "mac": Option("", type=valid_stripped_string_not_empty), - "state_poll": Option(5.0, type=valid_float_f01), - "timeout": Option(5.0, type=valid_float_f01), + "url": Option("", type=valid_stripped_string_not_empty), + "verify": Option(True, type=valid_bool), + "user": Option(""), + "passwd": Option(""), + "mac": Option("", type=valid_stripped_string_not_empty), "switch_delay": Option(1.0, type=valid_float_f01), + "state_poll": Option(5.0, type=valid_float_f01), + "timeout": Option(5.0, type=valid_float_f01), } @classmethod @@ -121,253 +106,178 @@ class Plugin(BaseUserGpioDriver): self.__state[pin] = None def register_output(self, pin: str, initial: (bool | None)) -> None: + _ = initial if pin.isnumeric(): - self.__initial[pin] = initial self.__state[pin] = None - def prepare(self) -> None: - for (pin, state) in self.__initial.items(): - if pin.isnumeric(): - if state is not None: - self.__state[pin] = state - async def run(self) -> None: prev_state: (dict | None) = None while True: - try: - if self.__csrf_token is None: - get_logger().info( - "Logging into UNIFI" - ) - await self.login() - async with self.__http_session.get( - url=f"{self.__api_url}/stat/device/{self.__mac}", - headers=self.__get_headers() - ) as response: - htclient.raise_not_200(response) - self.__handle_headers(response.headers) + if (await self.__ensure_login()): + try: + async with self.__ensure_http_session().get( + url=f"{self.__api_url}/stat/device/{self.__mac}", + headers=self.__make_headers(token=True, post_json=False), + ) as response: - status = (await response.json())["data"][0] - if self.__id is None or self.__id != status["_id"]: - self.__id = status["_id"] + self.__handle_response(response) - port_overrides = dict(map( - lambda port: (str(port["port_idx"]), port), - status["port_overrides"])) + status = (await response.json())["data"][0] + if self.__id is None or self.__id != status["_id"]: + self.__id = status["_id"] - for port_key, port in port_overrides.items(): - self.__port_overrides[port_key] = port + port_overrides = dict(map( + lambda port: (str(port["port_idx"]), port), + status["port_overrides"])) - port_table = dict( - map(lambda port: (str(port["port_idx"]), port), - list(filter(lambda p: p["port_poe"] is True, - status["port_table"])))) + for port_key, port in port_overrides.items(): + self.__port_overrides[port_key] = port - for port_key, port in port_table.items(): - self.__port_table[port_key] = port + port_table = dict( + map(lambda port: (str(port["port_idx"]), port), + list(filter(lambda p: p["port_poe"] is True, + status["port_table"])))) - for pin in self.__state: - if pin is not None: - port = self.__port_table[pin] - self.__state[pin] = port["poe_mode"] == "auto" + for port_key, port in port_table.items(): + self.__port_table[port_key] = port - except aiohttp.ClientResponseError as err: - await self.__handle_client_response_error(err) - except Exception as err: - get_logger().error("Failed UNIFI bulk GET request: %s", - tools.efmt(err)) - self.__state = dict.fromkeys(self.__state, None) + for pin in self.__state: + if pin is not None: + port = self.__port_table[pin] + self.__state[pin] = port["poe_mode"] == "auto" + + except Exception as err: + get_logger().error("Failed UNIFI bulk GET request: %s", tools.efmt(err)) + self.__state = dict.fromkeys(self.__state, None) - if self.__state != prev_state: - self._notifier.notify() - prev_state = self.__state + if self.__state != prev_state: + self._notifier.notify() + prev_state = self.__state await self.__update_notifier.wait(self.__state_poll) async def cleanup(self) -> None: if self.__http_session: await self.__http_session.close() - self.__http_session = aiohttp.ClientSession() + self.__http_session = None self.__csrf_token = None async def read(self, pin: str) -> bool: - if pin.isnumeric() is False: + if not pin.isnumeric(): return False - try: - if self.__csrf_token is None: - get_logger().info( - "Logging into UNIFI" - ) - await self.login() - return await self.__inner_read(pin) - except aiohttp.ClientResponseError as err: - await self.__handle_client_response_error(err) - except Exception: + if not (await self.__ensure_login()): raise GpioDriverOfflineError(self) - return False + return self.__state[pin] is not None and bool(self.__state[pin]) async def write(self, pin: str, state: bool) -> None: + if not (await self.__ensure_login()): + raise GpioDriverOfflineError(self) try: - if self.__csrf_token is None: - get_logger().info( - "Logging into UNIFI" - ) - await self.login() if pin.endswith(":cycle"): await self.__cycle_device(pin, state) else: - await self.__inner_write(pin, state) - except aiohttp.ClientResponseError as err: - await self.__handle_client_response_error(err) + await self.__set_device(pin, state) except Exception as err: - get_logger().error( - "Failed UNIFI PUT request | pin : %s | Error: %s", - pin, - tools.efmt(err) - ) + get_logger().error("Failed UNIFI PUT request | pin: %s | Error: %s", pin, tools.efmt(err)) await asyncio.sleep(self.__switch_delay) # Slowdown self.__update_notifier.notify() # ===== - def __get_client_session(self, timeout: float, verify: bool) -> aiohttp.ClientSession: - kwargs: dict = { - "headers": { - "Accept": "application/json", - "User-Agent": htclient.make_user_agent("KVMD"), - }, - "timeout": aiohttp.ClientTimeout(total=timeout), - "connector": aiohttp.TCPConnector(verify_ssl=verify), - "cookie_jar": aiohttp.CookieJar(), - } - - return aiohttp.ClientSession(**kwargs) - - async def __inner_read(self, pin: str) -> bool: - return self.__state[pin] is not None and bool(self.__state[pin]) - - async def __inner_write(self, pin: str, state: bool) -> None: - await self.__set_device(pin, state) - async def __cycle_device(self, pin: str, state: bool) -> None: - if state is False: + if not state: return get_logger().info("Cycling device %s: port: %s", self.__mac, pin) - async with self.__http_session.post( + async with self.__ensure_http_session().post( url=f"{self.__api_url}/cmd/devmgr", json={ - "cmd": "power-cycle", - "mac": self.__mac, + "cmd": "power-cycle", + "mac": self.__mac, "port_idx": pin.split(":")[0], }, - headers=self.__get_headers() + headers=self.__make_headers(token=True, post_json=True), ) as response: - self.__handle_headers(response.headers) - htclient.raise_not_200(response) + self.__handle_response(response) async def __set_device(self, pin: str, state: bool) -> None: get_logger().info("Setting device %s: port: %s, state: %s", self.__mac, pin, state) - port_overrides = map( - functools.partial(set_poe_mode, pin, state), - self.__port_overrides.values() - ) - - data = { - "port_overrides": list(port_overrides) - } + port_overrides: list[dict[str, Any]] = [] + for po in self.__port_overrides.values(): + if str(po["port_idx"]) == pin: + # Also modifies value in self.__port_overrides + po["poe_mode"] = ("auto" if state else "off") + port_overrides.append(po) - get_logger().info("Posting content %s:\n%s\n", pin, json.dumps(data).encode("utf-8")) - - async with self.__http_session.put( + async with self.__ensure_http_session().put( url=f"{self.__api_url}/rest/device/{self.__id}", - json=data, - headers={ - "Accept": "application/json", - "Content-Type": "application/json;charset=UTF-8", - "X-CSRF-TOKEN": self.__csrf_token, - "User-Agent": htclient.make_user_agent("KVMD"), - } + json={"port_overrides": port_overrides}, + headers=self.__make_headers(token=True, post_json=True), ) as response: - htclient.raise_not_200(response) - - for header in response.headers: - if header.upper() == "X-CSRF-TOKEN": - self.__csrf_token = response.headers[header] - if response.cookies: - self.__http_session.cookie_jar.update_cookies( - response.cookies - ) + self.__handle_response(response) - result = await asyncio.sleep(5, result=state) + await asyncio.sleep(5) - self.__port_table[pin]["poe_enable"] = result - self.__port_table[pin]["poe_mode"] = "auto" if result else "off" - self.__state[pin] = result + self.__port_table[pin]["poe_enable"] = state + self.__port_table[pin]["poe_mode"] = ("auto" if state else "off") + self.__state[pin] = state - def __get_headers(self, - extra: (dict | None) = None - ) -> CIMultiDictProxy[str]: - kwargs: dict = { - "Accept": "application/json", - "X-CSRF-TOKEN": self.__csrf_token, - } - headers: CIMultiDict[str] = CIMultiDict(**kwargs) - if extra is not None: - headers.update(**extra) - return CIMultiDictProxy(headers) - - def __handle_headers(self, response_headers: CIMultiDictProxy[str]) -> None: - for header in response_headers: - if header.upper() == "X-CSRF-TOKEN": - self.__csrf_token = response_headers.get(header) - - async def __handle_client_response_error(self, - err: aiohttp.ClientResponseError - ) -> None: - if err.status == 401: - get_logger().info( - "UNIFI API request unauthorized. Attempting to refresh session" - ) + async def __ensure_login(self) -> bool: + if self.__csrf_token is None: + get_logger().info("Logging into Unifi") try: - await self.login() - except Exception as login_err: - get_logger().error("Failed UNIFI login request: %s", - tools.efmt(login_err)) - - async def login(self) -> None: - try: - response = await self.__http_session.post( - url=f"{self.__url}/api/auth/login", - json={ - "username": self.__user, - "password": self.__passwd - }, - headers={ - "Accept": "application/json", + async with self.__ensure_http_session().post( + url=f"{self.__url}/api/auth/login", + json={ + "username": self.__user, + "password": self.__passwd, + }, + headers=self.__make_headers(token=False, post_json=True), + ) as response: + self.__handle_response(response) + except Exception as err: + get_logger().error("Failed Unifi login request: %s", tools.efmt(err)) + return False + return True + + def __make_headers(self, token: bool, post_json: bool) -> dict[str, str]: + headers: dict[str, str] = {} + if token: + assert self.__csrf_token is not None + headers["X-CSRF-TOKEN"] = self.__csrf_token + if post_json: + headers["Content-Type"] = "application/json;charset=UTF-8" + return headers + + def __handle_response(self, response: aiohttp.ClientResponse) -> None: + assert self.__http_session is not None + if response.status == 401: + get_logger().info("Unifi API request unauthorized, we will retry a login") + self.__csrf_token = None + self.__http_session.cookie_jar.clear() + htclient.raise_not_200(response) + if "X-CSRF-TOKEN" in response.headers: + self.__csrf_token = response.headers["X-CSRF-TOKEN"] + if response.cookies: + self.__http_session.cookie_jar.update_cookies(response.cookies) + + def __ensure_http_session(self) -> aiohttp.ClientSession: + if not self.__http_session: + kwargs: dict = { + "headers": { + "Accept": "application/json", "User-Agent": htclient.make_user_agent("KVMD"), - "Content-Type": "application/json;charset=UTF-8", - } - ) - - htclient.raise_not_200(response) - - for header in response.headers: - if header.upper() == "X-CSRF-TOKEN": - self.__csrf_token = response.headers[header] - - if response.cookies: - self.__http_session.cookie_jar.update_cookies( - response.cookies - ) - except Exception as err: - get_logger().error( - "Failed UNIFI login request: %s", - tools.efmt(err) - ) + }, + "cookie_jar": aiohttp.CookieJar(), + "timeout": aiohttp.ClientTimeout(total=self.__timeout), + } + if not self.__verify: + kwargs["connector"] = aiohttp.TCPConnector(ssl=False) + self.__http_session = aiohttp.ClientSession(**kwargs) + return self.__http_session def __str__(self) -> str: - return f"UNIFI({self._instance_name})" + return f"Unifi({self._instance_name})" __repr__ = __str__ -- cgit v1.2.3