summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Atkins <[email protected]>2024-01-27 21:39:47 -0600
committerGitHub <[email protected]>2024-01-28 05:39:47 +0200
commit4f08769f7e4a7a0598d86cbe162fe25e568aa565 (patch)
treebef5486a48cb9e761c644aa1f037c42d8451944d
parent5b1101e42fa07a466cc878745b985e7317e8da98 (diff)
feat: add Unifi GPIO driver (#155)
-rw-r--r--kvmd/plugins/ugpio/unifi.py373
1 files changed, 373 insertions, 0 deletions
diff --git a/kvmd/plugins/ugpio/unifi.py b/kvmd/plugins/ugpio/unifi.py
new file mode 100644
index 00000000..c4d572e9
--- /dev/null
+++ b/kvmd/plugins/ugpio/unifi.py
@@ -0,0 +1,373 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2018-2023 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import functools
+import json
+
+from typing import Callable
+from typing import Any
+
+from multidict import CIMultiDict, CIMultiDictProxy
+
+import aiohttp
+
+from ...logging import get_logger
+
+from ... import tools
+from ... import aiotools
+from ... import htclient
+
+from ...yamlconf import Option
+
+from ...validators.basic import valid_stripped_string_not_empty
+from ...validators.basic import valid_bool
+from ...validators.basic import valid_float_f01
+
+from . import BaseUserGpioDriver
+from . import GpioDriverOfflineError
+
+
+# =====
+
+def set_poe_mode(pin: str, state: bool, port_override: dict[str, Any]) -> dict[str, Any]:
+ port_idx = str(port_override["port_idx"])
+ if port_idx == pin:
+ port_override["poe_mode"] = "auto" if state else "off"
+ return port_override
+
+
+# pylint: disable=too-many-instance-attributes disable=too-many-arguments
+class Plugin(BaseUserGpioDriver):
+ def __init__(
+ self,
+ instance_name: str,
+ notifier: aiotools.AioNotifier,
+
+ url: str,
+ verify: bool,
+ user: str,
+ passwd: str,
+ mac: str,
+
+ timeout: float,
+ switch_delay: float,
+ state_poll: float,
+ ) -> None:
+
+ super().__init__(instance_name, notifier)
+
+ self.__url = url
+ self.__user = user
+ self.__passwd = passwd
+ self.__mac = mac
+ self.__state_poll = state_poll
+
+ self.__switch_delay = switch_delay
+
+ self.__initial: dict[str, (bool | None)] = {}
+
+ self.__state: dict[str, (bool | None)] = {}
+
+ self.__port_table: dict[str, dict[str, Any]] = {}
+ self.__port_overrides: dict[str, dict[str, Any]] = {}
+
+ self.__update_notifier = aiotools.AioNotifier()
+
+ self.__http_session: aiohttp.ClientSession = self.__get_client_session(timeout, verify)
+
+ self.__csrf_token: (str | None) = None
+ self.__id: (str | None) = None
+ self.__api_url: str = f"{self.__url}/proxy/network/api/s/default"
+
+ @classmethod
+ def get_plugin_options(cls) -> dict[str, Option]:
+ return {
+ "url": Option("", type=valid_stripped_string_not_empty),
+ "verify": Option(True, type=valid_bool),
+ "user": Option(""),
+ "passwd": Option(""),
+ "mac": Option("", type=valid_stripped_string_not_empty),
+ "state_poll": Option(5.0, type=valid_float_f01),
+ "timeout": Option(5.0, type=valid_float_f01),
+ "switch_delay": Option(1.0, type=valid_float_f01),
+ }
+
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], Any]:
+ return valid_stripped_string_not_empty
+
+ def register_input(self, pin: str, debounce: float) -> None:
+ _ = debounce
+ self.__state[pin] = None
+
+ def register_output(self, pin: str, initial: (bool | None)) -> None:
+ if pin.isnumeric():
+ self.__initial[pin] = initial
+ self.__state[pin] = None
+
+ def prepare(self) -> None:
+ for (pin, state) in self.__initial.items():
+ if pin.isnumeric():
+ if state is not None:
+ self.__state[pin] = state
+
+ async def run(self) -> None:
+ prev_state: (dict | None) = None
+ while True:
+ try:
+ if self.__csrf_token is None:
+ get_logger().info(
+ "Logging into UNIFI"
+ )
+ await self.login()
+ async with self.__http_session.get(
+ url=f"{self.__api_url}/stat/device/{self.__mac}",
+ headers=self.__get_headers()
+ ) as response:
+ htclient.raise_not_200(response)
+ self.__handle_headers(response.headers)
+
+ status = (await response.json())["data"][0]
+ if self.__id is None or self.__id != status["_id"]:
+ self.__id = status["_id"]
+
+ port_overrides = dict(map(
+ lambda port: (str(port["port_idx"]), port),
+ status["port_overrides"]))
+
+ for port_key, port in port_overrides.items():
+ self.__port_overrides[port_key] = port
+
+ port_table = dict(
+ map(lambda port: (str(port["port_idx"]), port),
+ list(filter(lambda p: p["port_poe"] is True,
+ status["port_table"]))))
+
+ for port_key, port in port_table.items():
+ self.__port_table[port_key] = port
+
+ for pin in self.__state:
+ if pin is not None:
+ port = self.__port_table[pin]
+ self.__state[pin] = port["poe_mode"] == "auto"
+
+ except aiohttp.ClientResponseError as err:
+ await self.__handle_client_response_error(err)
+ except Exception as err:
+ get_logger().error("Failed UNIFI bulk GET request: %s",
+ tools.efmt(err))
+ self.__state = dict.fromkeys(self.__state, None)
+
+ if self.__state != prev_state:
+ self._notifier.notify()
+ prev_state = self.__state
+
+ await self.__update_notifier.wait(self.__state_poll)
+
+ async def cleanup(self) -> None:
+ if self.__http_session:
+ await self.__http_session.close()
+ self.__http_session = aiohttp.ClientSession()
+ self.__csrf_token = None
+
+ async def read(self, pin: str) -> bool:
+ if pin.isnumeric() is False:
+ return False
+ try:
+ if self.__csrf_token is None:
+ get_logger().info(
+ "Logging into UNIFI"
+ )
+ await self.login()
+ return await self.__inner_read(pin)
+ except aiohttp.ClientResponseError as err:
+ await self.__handle_client_response_error(err)
+ except Exception:
+ raise GpioDriverOfflineError(self)
+ return False
+
+ async def write(self, pin: str, state: bool) -> None:
+ try:
+ if self.__csrf_token is None:
+ get_logger().info(
+ "Logging into UNIFI"
+ )
+ await self.login()
+ if pin.endswith(":cycle"):
+ await self.__cycle_device(pin, state)
+ else:
+ await self.__inner_write(pin, state)
+ except aiohttp.ClientResponseError as err:
+ await self.__handle_client_response_error(err)
+ except Exception as err:
+ get_logger().error(
+ "Failed UNIFI PUT request | pin : %s | Error: %s",
+ pin,
+ tools.efmt(err)
+ )
+ await asyncio.sleep(self.__switch_delay) # Slowdown
+ self.__update_notifier.notify()
+
+ # =====
+
+ def __get_client_session(self, timeout: float, verify: bool) -> aiohttp.ClientSession:
+ kwargs: dict = {
+ "headers": {
+ "Accept": "application/json",
+ "User-Agent": htclient.make_user_agent("KVMD"),
+ },
+ "timeout": aiohttp.ClientTimeout(total=timeout),
+ "connector": aiohttp.TCPConnector(verify_ssl=verify),
+ "cookie_jar": aiohttp.CookieJar(),
+ }
+
+ return aiohttp.ClientSession(**kwargs)
+
+ async def __inner_read(self, pin: str) -> bool:
+ return self.__state[pin] is not None and bool(self.__state[pin])
+
+ async def __inner_write(self, pin: str, state: bool) -> None:
+ await self.__set_device(pin, state)
+
+ async def __cycle_device(self, pin: str, state: bool) -> None:
+ if state is False:
+ return
+ get_logger().info("Cycling device %s: port: %s", self.__mac, pin)
+ async with self.__http_session.post(
+ url=f"{self.__api_url}/cmd/devmgr",
+ json={
+ "cmd": "power-cycle",
+ "mac": self.__mac,
+ "port_idx": pin.split(":")[0],
+ },
+ headers=self.__get_headers()
+ ) as response:
+ self.__handle_headers(response.headers)
+ htclient.raise_not_200(response)
+
+ async def __set_device(self, pin: str, state: bool) -> None:
+ get_logger().info("Setting device %s: port: %s, state: %s", self.__mac, pin, state)
+
+ port_overrides = map(
+ functools.partial(set_poe_mode, pin, state),
+ self.__port_overrides.values()
+ )
+
+ data = {
+ "port_overrides": list(port_overrides)
+ }
+
+ get_logger().info("Posting content %s:\n%s\n", pin, json.dumps(data).encode("utf-8"))
+
+ async with self.__http_session.put(
+ url=f"{self.__api_url}/rest/device/{self.__id}",
+ json=data,
+ headers={
+ "Accept": "application/json",
+ "Content-Type": "application/json;charset=UTF-8",
+ "X-CSRF-TOKEN": self.__csrf_token,
+ "User-Agent": htclient.make_user_agent("KVMD"),
+ }
+ ) as response:
+ htclient.raise_not_200(response)
+
+ for header in response.headers:
+ if header.upper() == "X-CSRF-TOKEN":
+ self.__csrf_token = response.headers[header]
+
+ if response.cookies:
+ self.__http_session.cookie_jar.update_cookies(
+ response.cookies
+ )
+
+ result = await asyncio.sleep(5, result=state)
+
+ self.__port_table[pin]["poe_enable"] = result
+ self.__port_table[pin]["poe_mode"] = "auto" if result else "off"
+ self.__state[pin] = result
+
+ def __get_headers(self,
+ extra: (dict | None) = None
+ ) -> CIMultiDictProxy[str]:
+ kwargs: dict = {
+ "Accept": "application/json",
+ "X-CSRF-TOKEN": self.__csrf_token,
+ }
+ headers: CIMultiDict[str] = CIMultiDict(**kwargs)
+ if extra is not None:
+ headers.update(**extra)
+ return CIMultiDictProxy(headers)
+
+ def __handle_headers(self, response_headers: CIMultiDictProxy[str]) -> None:
+ for header in response_headers:
+ if header.upper() == "X-CSRF-TOKEN":
+ self.__csrf_token = response_headers.get(header)
+
+ async def __handle_client_response_error(self,
+ err: aiohttp.ClientResponseError
+ ) -> None:
+ if err.status == 401:
+ get_logger().info(
+ "UNIFI API request unauthorized. Attempting to refresh session"
+ )
+ try:
+ await self.login()
+ except Exception as login_err:
+ get_logger().error("Failed UNIFI login request: %s",
+ tools.efmt(login_err))
+
+ async def login(self) -> None:
+ try:
+ response = await self.__http_session.post(
+ url=f"{self.__url}/api/auth/login",
+ json={
+ "username": self.__user,
+ "password": self.__passwd
+ },
+ headers={
+ "Accept": "application/json",
+ "User-Agent": htclient.make_user_agent("KVMD"),
+ "Content-Type": "application/json;charset=UTF-8",
+ }
+ )
+
+ htclient.raise_not_200(response)
+
+ for header in response.headers:
+ if header.upper() == "X-CSRF-TOKEN":
+ self.__csrf_token = response.headers[header]
+
+ if response.cookies:
+ self.__http_session.cookie_jar.update_cookies(
+ response.cookies
+ )
+ except Exception as err:
+ get_logger().error(
+ "Failed UNIFI login request: %s",
+ tools.efmt(err)
+ )
+
+ def __str__(self) -> str:
+ return f"UNIFI({self._instance_name})"
+
+ __repr__ = __str__