diff options
Diffstat (limited to 'kvmd/apps/vnc/kvmd.py')
-rw-r--r-- | kvmd/apps/vnc/kvmd.py | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/kvmd/apps/vnc/kvmd.py b/kvmd/apps/vnc/kvmd.py new file mode 100644 index 00000000..d921ad43 --- /dev/null +++ b/kvmd/apps/vnc/kvmd.py @@ -0,0 +1,110 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import contextlib + +from typing import Dict + +import aiohttp + +from ...logging import get_logger + +from ... import __version__ + + +# ===== +class KvmdClient: + def __init__( + self, + host: str, + port: int, + unix_path: str, + timeout: float, + ) -> None: + + assert port or unix_path + self.__host = host + self.__port = port + self.__unix_path = unix_path + self.__timeout = timeout + + # ===== + + async def authorize(self, user: str, passwd: str) -> bool: + try: + async with self.__make_session(user, passwd) as session: + async with session.get( + url=f"http://{self.__host}:{self.__port}/auth/check", + timeout=self.__timeout, + ) as response: + response.raise_for_status() + if response.status == 200: + return True + raise RuntimeError(f"Invalid OK response: {response.status} {await response.text()}") + except aiohttp.ClientResponseError as err: + if err.status in [401, 403]: + return False + get_logger(0).exception("Can't check user access") + except Exception: + get_logger(0).exception("Can't check user access") + return False + + @contextlib.asynccontextmanager + async def ws(self, user: str, passwd: str) -> aiohttp.ClientWebSocketResponse: # pylint: disable=invalid-name + async with self.__make_session(user, passwd) as session: + async with session.ws_connect( + url=f"http://{self.__host}:{self.__port}/ws", + timeout=self.__timeout, + ) as ws: + yield ws + + async def set_streamer_params(self, user: str, passwd: str, quality: int=-1, desired_fps: int=-1) -> None: + params = { + key: value + for (key, value) in [ + ("quality", quality), + ("desired_fps", desired_fps), + ] + if value >= 0 + } + if params: + async with self.__make_session(user, passwd) as session: + async with session.post( + url=f"http://{self.__host}:{self.__port}/streamer/set_params", + timeout=self.__timeout, + params=params, + ) as response: + response.raise_for_status() + + # ===== + + def __make_session(self, user: str, passwd: str) -> aiohttp.ClientSession: + kwargs: Dict = { + "headers": { + "X-KVMD-User": user, + "X-KVMD-Passwd": passwd, + "User-Agent": f"KVMD-VNC/{__version__}", + }, + } + if self.__unix_path: + kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path) + return aiohttp.ClientSession(**kwargs) |