summaryrefslogtreecommitdiff
path: root/kvmd/clients/streamer.py
blob: 493ac080bfbfd4128d4758b95ad94ba9e1b89ce5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# ========================================================================== #
#                                                                            #
#    KVMD - The main Pi-KVM daemon.                                          #
#                                                                            #
#    Copyright (C) 2020  Maxim Devaev <[email protected]>                    #
#                                                                            #
#    This program is free software: you can redistribute it and/or modify    #
#    it under the terms of the GNU General Public License as published by    #
#    the Free Software Foundation, either version 3 of the License, or       #
#    (at your option) any later version.                                     #
#                                                                            #
#    This program is distributed in the hope that it will be useful,         #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of          #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
#    GNU General Public License for more details.                            #
#                                                                            #
#    You should have received a copy of the GNU General Public License       #
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.  #
#                                                                            #
# ========================================================================== #


from typing import Tuple
from typing import Dict
from typing import AsyncGenerator

import aiohttp

from .. import htclient


# =====
class StreamerError(Exception):
    pass


# =====
class StreamerClient:
    def __init__(
        self,
        host: str,
        port: int,
        unix_path: str,
        timeout: float,
        user_agent: str,
    ) -> None:

        assert port or unix_path
        self.__host = host
        self.__port = port
        self.__unix_path = unix_path
        self.__timeout = timeout
        self.__user_agent = user_agent

    async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
        try:
            async with self.__make_http_session(infinite=True) as session:
                async with session.get(
                    url=self.__make_url("stream"),
                    params={"extra_headers": "1"},
                ) as response:
                    htclient.raise_not_200(response)
                    reader = aiohttp.MultipartReader.from_response(response)

                    while True:
                        frame = await reader.next()  # pylint: disable=not-callable
                        if not isinstance(frame, aiohttp.BodyPartReader):
                            raise RuntimeError("Expected body part")

                        if hasattr(frame, "_content"):
                            if frame._content.is_eof():  # pylint: disable=protected-access
                                break
                        data = bytes(await frame.read())
                        if not data:
                            break

                        yield (
                            (frame.headers["X-UStreamer-Online"] == "true"),
                            int(frame.headers["X-UStreamer-Width"]),
                            int(frame.headers["X-UStreamer-Height"]),
                            data,
                        )
        except Exception as err:  # Тут бывают и ассерты, и KeyError, и прочая херня из-за корявых исключений в MultipartReader
            raise StreamerError(f"{type(err).__name__}: {err}")
        raise StreamerError("Reached EOF")

#    async def get_snapshot(self) -> Tuple[bool, bytes]:
#        async with self.__make_http_session(infinite=False) as session:
#            async with session.get(self.__make_url("snapshot")) as response:
#                htclient.raise_not_200(response)
#                return (
#                    (response.headers["X-UStreamer-Online"] == "true"),
#                    bytes(await response.read()),
#                )

    def __make_http_session(self, infinite: bool) -> aiohttp.ClientSession:
        kwargs: Dict = {"headers": {"User-Agent": self.__user_agent}}
        if infinite:
            kwargs["timeout"] = aiohttp.ClientTimeout(
                connect=self.__timeout,
                sock_read=self.__timeout,
            )
        else:
            kwargs["timeout"] = aiohttp.ClientTimeout(total=self.__timeout)
        if self.__unix_path:
            kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path)
        return aiohttp.ClientSession(**kwargs)

    def __make_url(self, handle: str) -> str:
        assert not handle.startswith("/"), handle
        return f"http://{self.__host}:{self.__port}/{handle}"