summaryrefslogtreecommitdiff
path: root/kvmd/clients
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
committerMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
commitee3e224e396494cd0d69bb6167087a071a20349c (patch)
tree5becd28570e58a03c6e1e231d0db24c264a73f88 /kvmd/clients
parent4b75221e9470b4a009955d7677f16adf8e23e302 (diff)
new typing style
Diffstat (limited to 'kvmd/clients')
-rw-r--r--kvmd/clients/kvmd.py27
-rw-r--r--kvmd/clients/streamer.py11
2 files changed, 16 insertions, 22 deletions
diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py
index f8b0d811..f98c1064 100644
--- a/kvmd/clients/kvmd.py
+++ b/kvmd/clients/kvmd.py
@@ -24,13 +24,8 @@ import asyncio
import contextlib
import types
-from typing import Tuple
-from typing import Dict
-from typing import Set
from typing import Callable
-from typing import Type
from typing import AsyncGenerator
-from typing import Optional
import aiohttp
@@ -65,13 +60,13 @@ class _AuthApiPart(_BaseApiPart):
class _StreamerApiPart(_BaseApiPart):
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
session = self._ensure_http_session()
async with session.get(self._make_url("streamer")) as response:
htclient.raise_not_200(response)
return (await response.json())["result"]
- async def set_params(self, quality: Optional[int]=None, desired_fps: Optional[int]=None) -> None:
+ async def set_params(self, quality: (int | None)=None, desired_fps: (int | None)=None) -> None:
session = self._ensure_http_session()
async with session.post(
url=self._make_url("streamer/set_params"),
@@ -88,7 +83,7 @@ class _StreamerApiPart(_BaseApiPart):
class _HidApiPart(_BaseApiPart):
- async def get_keymaps(self) -> Tuple[str, Set[str]]:
+ async def get_keymaps(self) -> tuple[str, set[str]]:
session = self._ensure_http_session()
async with session.get(self._make_url("hid/keymaps")) as response:
htclient.raise_not_200(response)
@@ -106,7 +101,7 @@ class _HidApiPart(_BaseApiPart):
class _AtxApiPart(_BaseApiPart):
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
session = self._ensure_http_session()
async with session.get(self._make_url("atx")) as response:
htclient.raise_not_200(response)
@@ -132,14 +127,14 @@ class KvmdClientWs:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self.__ws = ws
- self.__writer_queue: "asyncio.Queue[Tuple[str, Dict]]" = asyncio.Queue()
+ self.__writer_queue: "asyncio.Queue[tuple[str, dict]]" = asyncio.Queue()
self.__communicated = False
- async def communicate(self) -> AsyncGenerator[Tuple[str, Dict], None]: # pylint: disable=too-many-branches
+ async def communicate(self) -> AsyncGenerator[tuple[str, dict], None]: # pylint: disable=too-many-branches
assert not self.__communicated
self.__communicated = True
- receive_task: Optional[asyncio.Task] = None
- writer_task: Optional[asyncio.Task] = None
+ receive_task: (asyncio.Task | None) = None
+ writer_task: (asyncio.Task | None) = None
try:
while True:
if receive_task is None:
@@ -199,7 +194,7 @@ class KvmdClientSession:
self.__make_http_session = make_http_session
self.__make_url = make_url
- self.__http_session: Optional[aiohttp.ClientSession] = None
+ self.__http_session: (aiohttp.ClientSession | None) = None
args = (self.__ensure_http_session, make_url)
@@ -229,7 +224,7 @@ class KvmdClientSession:
async def __aexit__(
self,
- _exc_type: Type[BaseException],
+ _exc_type: type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
@@ -256,7 +251,7 @@ class KvmdClient:
)
def __make_http_session(self, user: str, passwd: str) -> aiohttp.ClientSession:
- kwargs: Dict = {
+ kwargs: dict = {
"headers": {
"X-KVMD-User": user,
"X-KVMD-Passwd": passwd,
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index dd4167f9..f26fed1e 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -22,7 +22,6 @@
import types
-from typing import Dict
from typing import AsyncGenerator
import aiohttp
@@ -61,7 +60,7 @@ class BaseStreamerClient:
def get_format(self) -> int:
raise NotImplementedError()
- async def read_stream(self) -> AsyncGenerator[Dict, None]:
+ async def read_stream(self) -> AsyncGenerator[dict, None]:
if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError()
yield
@@ -84,7 +83,7 @@ class HttpStreamerClient(BaseStreamerClient):
def get_format(self) -> int:
return StreamFormats.JPEG
- async def read_stream(self) -> AsyncGenerator[Dict, None]:
+ async def read_stream(self) -> AsyncGenerator[dict, None]:
try:
async with self.__make_http_session() as session:
async with session.get(
@@ -118,7 +117,7 @@ class HttpStreamerClient(BaseStreamerClient):
raise StreamerTempError("Reached EOF")
def __make_http_session(self) -> aiohttp.ClientSession:
- kwargs: Dict = {
+ kwargs: dict = {
"headers": {"User-Agent": self.__user_agent},
"connector": aiohttp.UnixConnector(path=self.__unix_path),
"timeout": aiohttp.ClientTimeout(
@@ -162,7 +161,7 @@ class MemsinkStreamerClient(BaseStreamerClient):
self.__name = name
self.__fmt = fmt
- self.__kwargs: Dict = {
+ self.__kwargs: dict = {
"obj": obj,
"lock_timeout": lock_timeout,
"wait_timeout": wait_timeout,
@@ -172,7 +171,7 @@ class MemsinkStreamerClient(BaseStreamerClient):
def get_format(self) -> int:
return self.__fmt
- async def read_stream(self) -> AsyncGenerator[Dict, None]:
+ async def read_stream(self) -> AsyncGenerator[dict, None]:
if ustreamer is None:
raise StreamerPermError("Missing ustreamer library")
try: