summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/unifi.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/unifi.py')
-rw-r--r--kvmd/plugins/ugpio/unifi.py344
1 files changed, 127 insertions, 217 deletions
diff --git a/kvmd/plugins/ugpio/unifi.py b/kvmd/plugins/ugpio/unifi.py
index c4d572e9..67a83e45 100644
--- a/kvmd/plugins/ugpio/unifi.py
+++ b/kvmd/plugins/ugpio/unifi.py
@@ -21,14 +21,10 @@
import asyncio
-import functools
-import json
from typing import Callable
from typing import Any
-from multidict import CIMultiDict, CIMultiDictProxy
-
import aiohttp
from ...logging import get_logger
@@ -48,17 +44,8 @@ from . import GpioDriverOfflineError
# =====
-
-def set_poe_mode(pin: str, state: bool, port_override: dict[str, Any]) -> dict[str, Any]:
- port_idx = str(port_override["port_idx"])
- if port_idx == pin:
- port_override["poe_mode"] = "auto" if state else "off"
- return port_override
-
-
-# pylint: disable=too-many-instance-attributes disable=too-many-arguments
-class Plugin(BaseUserGpioDriver):
- def __init__(
+class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes
+ def __init__( # pylint: disable=too-many-arguments
self,
instance_name: str,
notifier: aiotools.AioNotifier,
@@ -68,23 +55,21 @@ class Plugin(BaseUserGpioDriver):
user: str,
passwd: str,
mac: str,
-
- timeout: float,
switch_delay: float,
+ timeout: float,
state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
self.__url = url
+ self.__verify = verify
self.__user = user
self.__passwd = passwd
self.__mac = mac
- self.__state_poll = state_poll
-
self.__switch_delay = switch_delay
-
- self.__initial: dict[str, (bool | None)] = {}
+ self.__timeout = timeout
+ self.__state_poll = state_poll
self.__state: dict[str, (bool | None)] = {}
@@ -93,7 +78,7 @@ class Plugin(BaseUserGpioDriver):
self.__update_notifier = aiotools.AioNotifier()
- self.__http_session: aiohttp.ClientSession = self.__get_client_session(timeout, verify)
+ self.__http_session: (aiohttp.ClientSession | None) = None
self.__csrf_token: (str | None) = None
self.__id: (str | None) = None
@@ -102,14 +87,14 @@ class Plugin(BaseUserGpioDriver):
@classmethod
def get_plugin_options(cls) -> dict[str, Option]:
return {
- "url": Option("", type=valid_stripped_string_not_empty),
- "verify": Option(True, type=valid_bool),
- "user": Option(""),
- "passwd": Option(""),
- "mac": Option("", type=valid_stripped_string_not_empty),
- "state_poll": Option(5.0, type=valid_float_f01),
- "timeout": Option(5.0, type=valid_float_f01),
+ "url": Option("", type=valid_stripped_string_not_empty),
+ "verify": Option(True, type=valid_bool),
+ "user": Option(""),
+ "passwd": Option(""),
+ "mac": Option("", type=valid_stripped_string_not_empty),
"switch_delay": Option(1.0, type=valid_float_f01),
+ "state_poll": Option(5.0, type=valid_float_f01),
+ "timeout": Option(5.0, type=valid_float_f01),
}
@classmethod
@@ -121,253 +106,178 @@ class Plugin(BaseUserGpioDriver):
self.__state[pin] = None
def register_output(self, pin: str, initial: (bool | None)) -> None:
+ _ = initial
if pin.isnumeric():
- self.__initial[pin] = initial
self.__state[pin] = None
- def prepare(self) -> None:
- for (pin, state) in self.__initial.items():
- if pin.isnumeric():
- if state is not None:
- self.__state[pin] = state
-
async def run(self) -> None:
prev_state: (dict | None) = None
while True:
- try:
- if self.__csrf_token is None:
- get_logger().info(
- "Logging into UNIFI"
- )
- await self.login()
- async with self.__http_session.get(
- url=f"{self.__api_url}/stat/device/{self.__mac}",
- headers=self.__get_headers()
- ) as response:
- htclient.raise_not_200(response)
- self.__handle_headers(response.headers)
+ if (await self.__ensure_login()):
+ try:
+ async with self.__ensure_http_session().get(
+ url=f"{self.__api_url}/stat/device/{self.__mac}",
+ headers=self.__make_headers(token=True, post_json=False),
+ ) as response:
- status = (await response.json())["data"][0]
- if self.__id is None or self.__id != status["_id"]:
- self.__id = status["_id"]
+ self.__handle_response(response)
- port_overrides = dict(map(
- lambda port: (str(port["port_idx"]), port),
- status["port_overrides"]))
+ status = (await response.json())["data"][0]
+ if self.__id is None or self.__id != status["_id"]:
+ self.__id = status["_id"]
- for port_key, port in port_overrides.items():
- self.__port_overrides[port_key] = port
+ port_overrides = dict(map(
+ lambda port: (str(port["port_idx"]), port),
+ status["port_overrides"]))
- port_table = dict(
- map(lambda port: (str(port["port_idx"]), port),
- list(filter(lambda p: p["port_poe"] is True,
- status["port_table"]))))
+ for port_key, port in port_overrides.items():
+ self.__port_overrides[port_key] = port
- for port_key, port in port_table.items():
- self.__port_table[port_key] = port
+ port_table = dict(
+ map(lambda port: (str(port["port_idx"]), port),
+ list(filter(lambda p: p["port_poe"] is True,
+ status["port_table"]))))
- for pin in self.__state:
- if pin is not None:
- port = self.__port_table[pin]
- self.__state[pin] = port["poe_mode"] == "auto"
+ for port_key, port in port_table.items():
+ self.__port_table[port_key] = port
- except aiohttp.ClientResponseError as err:
- await self.__handle_client_response_error(err)
- except Exception as err:
- get_logger().error("Failed UNIFI bulk GET request: %s",
- tools.efmt(err))
- self.__state = dict.fromkeys(self.__state, None)
+ for pin in self.__state:
+ if pin is not None:
+ port = self.__port_table[pin]
+ self.__state[pin] = port["poe_mode"] == "auto"
+
+ except Exception as err:
+ get_logger().error("Failed UNIFI bulk GET request: %s", tools.efmt(err))
+ self.__state = dict.fromkeys(self.__state, None)
- if self.__state != prev_state:
- self._notifier.notify()
- prev_state = self.__state
+ if self.__state != prev_state:
+ self._notifier.notify()
+ prev_state = self.__state
await self.__update_notifier.wait(self.__state_poll)
async def cleanup(self) -> None:
if self.__http_session:
await self.__http_session.close()
- self.__http_session = aiohttp.ClientSession()
+ self.__http_session = None
self.__csrf_token = None
async def read(self, pin: str) -> bool:
- if pin.isnumeric() is False:
+ if not pin.isnumeric():
return False
- try:
- if self.__csrf_token is None:
- get_logger().info(
- "Logging into UNIFI"
- )
- await self.login()
- return await self.__inner_read(pin)
- except aiohttp.ClientResponseError as err:
- await self.__handle_client_response_error(err)
- except Exception:
+ if not (await self.__ensure_login()):
raise GpioDriverOfflineError(self)
- return False
+ return self.__state[pin] is not None and bool(self.__state[pin])
async def write(self, pin: str, state: bool) -> None:
+ if not (await self.__ensure_login()):
+ raise GpioDriverOfflineError(self)
try:
- if self.__csrf_token is None:
- get_logger().info(
- "Logging into UNIFI"
- )
- await self.login()
if pin.endswith(":cycle"):
await self.__cycle_device(pin, state)
else:
- await self.__inner_write(pin, state)
- except aiohttp.ClientResponseError as err:
- await self.__handle_client_response_error(err)
+ await self.__set_device(pin, state)
except Exception as err:
- get_logger().error(
- "Failed UNIFI PUT request | pin : %s | Error: %s",
- pin,
- tools.efmt(err)
- )
+ get_logger().error("Failed UNIFI PUT request | pin: %s | Error: %s", pin, tools.efmt(err))
await asyncio.sleep(self.__switch_delay) # Slowdown
self.__update_notifier.notify()
# =====
- def __get_client_session(self, timeout: float, verify: bool) -> aiohttp.ClientSession:
- kwargs: dict = {
- "headers": {
- "Accept": "application/json",
- "User-Agent": htclient.make_user_agent("KVMD"),
- },
- "timeout": aiohttp.ClientTimeout(total=timeout),
- "connector": aiohttp.TCPConnector(verify_ssl=verify),
- "cookie_jar": aiohttp.CookieJar(),
- }
-
- return aiohttp.ClientSession(**kwargs)
-
- async def __inner_read(self, pin: str) -> bool:
- return self.__state[pin] is not None and bool(self.__state[pin])
-
- async def __inner_write(self, pin: str, state: bool) -> None:
- await self.__set_device(pin, state)
-
async def __cycle_device(self, pin: str, state: bool) -> None:
- if state is False:
+ if not state:
return
get_logger().info("Cycling device %s: port: %s", self.__mac, pin)
- async with self.__http_session.post(
+ async with self.__ensure_http_session().post(
url=f"{self.__api_url}/cmd/devmgr",
json={
- "cmd": "power-cycle",
- "mac": self.__mac,
+ "cmd": "power-cycle",
+ "mac": self.__mac,
"port_idx": pin.split(":")[0],
},
- headers=self.__get_headers()
+ headers=self.__make_headers(token=True, post_json=True),
) as response:
- self.__handle_headers(response.headers)
- htclient.raise_not_200(response)
+ self.__handle_response(response)
async def __set_device(self, pin: str, state: bool) -> None:
get_logger().info("Setting device %s: port: %s, state: %s", self.__mac, pin, state)
- port_overrides = map(
- functools.partial(set_poe_mode, pin, state),
- self.__port_overrides.values()
- )
-
- data = {
- "port_overrides": list(port_overrides)
- }
+ port_overrides: list[dict[str, Any]] = []
+ for po in self.__port_overrides.values():
+ if str(po["port_idx"]) == pin:
+ # Also modifies value in self.__port_overrides
+ po["poe_mode"] = ("auto" if state else "off")
+ port_overrides.append(po)
- get_logger().info("Posting content %s:\n%s\n", pin, json.dumps(data).encode("utf-8"))
-
- async with self.__http_session.put(
+ async with self.__ensure_http_session().put(
url=f"{self.__api_url}/rest/device/{self.__id}",
- json=data,
- headers={
- "Accept": "application/json",
- "Content-Type": "application/json;charset=UTF-8",
- "X-CSRF-TOKEN": self.__csrf_token,
- "User-Agent": htclient.make_user_agent("KVMD"),
- }
+ json={"port_overrides": port_overrides},
+ headers=self.__make_headers(token=True, post_json=True),
) as response:
- htclient.raise_not_200(response)
-
- for header in response.headers:
- if header.upper() == "X-CSRF-TOKEN":
- self.__csrf_token = response.headers[header]
- if response.cookies:
- self.__http_session.cookie_jar.update_cookies(
- response.cookies
- )
+ self.__handle_response(response)
- result = await asyncio.sleep(5, result=state)
+ await asyncio.sleep(5)
- self.__port_table[pin]["poe_enable"] = result
- self.__port_table[pin]["poe_mode"] = "auto" if result else "off"
- self.__state[pin] = result
+ self.__port_table[pin]["poe_enable"] = state
+ self.__port_table[pin]["poe_mode"] = ("auto" if state else "off")
+ self.__state[pin] = state
- def __get_headers(self,
- extra: (dict | None) = None
- ) -> CIMultiDictProxy[str]:
- kwargs: dict = {
- "Accept": "application/json",
- "X-CSRF-TOKEN": self.__csrf_token,
- }
- headers: CIMultiDict[str] = CIMultiDict(**kwargs)
- if extra is not None:
- headers.update(**extra)
- return CIMultiDictProxy(headers)
-
- def __handle_headers(self, response_headers: CIMultiDictProxy[str]) -> None:
- for header in response_headers:
- if header.upper() == "X-CSRF-TOKEN":
- self.__csrf_token = response_headers.get(header)
-
- async def __handle_client_response_error(self,
- err: aiohttp.ClientResponseError
- ) -> None:
- if err.status == 401:
- get_logger().info(
- "UNIFI API request unauthorized. Attempting to refresh session"
- )
+ async def __ensure_login(self) -> bool:
+ if self.__csrf_token is None:
+ get_logger().info("Logging into Unifi")
try:
- await self.login()
- except Exception as login_err:
- get_logger().error("Failed UNIFI login request: %s",
- tools.efmt(login_err))
-
- async def login(self) -> None:
- try:
- response = await self.__http_session.post(
- url=f"{self.__url}/api/auth/login",
- json={
- "username": self.__user,
- "password": self.__passwd
- },
- headers={
- "Accept": "application/json",
+ async with self.__ensure_http_session().post(
+ url=f"{self.__url}/api/auth/login",
+ json={
+ "username": self.__user,
+ "password": self.__passwd,
+ },
+ headers=self.__make_headers(token=False, post_json=True),
+ ) as response:
+ self.__handle_response(response)
+ except Exception as err:
+ get_logger().error("Failed Unifi login request: %s", tools.efmt(err))
+ return False
+ return True
+
+ def __make_headers(self, token: bool, post_json: bool) -> dict[str, str]:
+ headers: dict[str, str] = {}
+ if token:
+ assert self.__csrf_token is not None
+ headers["X-CSRF-TOKEN"] = self.__csrf_token
+ if post_json:
+ headers["Content-Type"] = "application/json;charset=UTF-8"
+ return headers
+
+ def __handle_response(self, response: aiohttp.ClientResponse) -> None:
+ assert self.__http_session is not None
+ if response.status == 401:
+ get_logger().info("Unifi API request unauthorized, we will retry a login")
+ self.__csrf_token = None
+ self.__http_session.cookie_jar.clear()
+ htclient.raise_not_200(response)
+ if "X-CSRF-TOKEN" in response.headers:
+ self.__csrf_token = response.headers["X-CSRF-TOKEN"]
+ if response.cookies:
+ self.__http_session.cookie_jar.update_cookies(response.cookies)
+
+ def __ensure_http_session(self) -> aiohttp.ClientSession:
+ if not self.__http_session:
+ kwargs: dict = {
+ "headers": {
+ "Accept": "application/json",
"User-Agent": htclient.make_user_agent("KVMD"),
- "Content-Type": "application/json;charset=UTF-8",
- }
- )
-
- htclient.raise_not_200(response)
-
- for header in response.headers:
- if header.upper() == "X-CSRF-TOKEN":
- self.__csrf_token = response.headers[header]
-
- if response.cookies:
- self.__http_session.cookie_jar.update_cookies(
- response.cookies
- )
- except Exception as err:
- get_logger().error(
- "Failed UNIFI login request: %s",
- tools.efmt(err)
- )
+ },
+ "cookie_jar": aiohttp.CookieJar(),
+ "timeout": aiohttp.ClientTimeout(total=self.__timeout),
+ }
+ if not self.__verify:
+ kwargs["connector"] = aiohttp.TCPConnector(ssl=False)
+ self.__http_session = aiohttp.ClientSession(**kwargs)
+ return self.__http_session
def __str__(self) -> str:
- return f"UNIFI({self._instance_name})"
+ return f"Unifi({self._instance_name})"
__repr__ = __str__