summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid/serial.py
blob: f7f3fc11e9930216a9c7e1de4ddb1b72399c1025 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# ========================================================================== #
#                                                                            #
#    KVMD - The main PiKVM daemon.                                           #
#                                                                            #
#    Copyright (C) 2018-2021  Maxim Devaev <[email protected]>               #
#                                                                            #
#    This program is free software: you can redistribute it and/or modify    #
#    it under the terms of the GNU General Public License as published by    #
#    the Free Software Foundation, either version 3 of the License, or       #
#    (at your option) any later version.                                     #
#                                                                            #
#    This program is distributed in the hope that it will be useful,         #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of          #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
#    GNU General Public License for more details.                            #
#                                                                            #
#    You should have received a copy of the GNU General Public License       #
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.  #
#                                                                            #
# ========================================================================== #


import os
import contextlib

from typing import Dict
from typing import Generator
from typing import Any

import serial

from ...yamlconf import Option

from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed

from ._mcu import BasePhyConnection
from ._mcu import BasePhy
from ._mcu import BaseMcuHid


# =====
class _SerialPhyConnection(BasePhyConnection):
    def __init__(self, tty: serial.Serial) -> None:
        self.__tty = tty

    def send(self, request: bytes) -> bytes:
        assert len(request) == 8
        assert request[0] == 0x33
        if self.__tty.in_waiting:
            self.__tty.read_all()
        assert self.__tty.write(request) == 8
        data = self.__tty.read(4)
        if len(data) == 4:
            if data[0] == 0x34:  # New response protocol
                data += self.__tty.read(4)
                if len(data) != 8:
                    return b""
            return data
        return b""


class _SerialPhy(BasePhy):
    def __init__(
        self,
        device_path: str,
        speed: int,
        read_timeout: float,
    ) -> None:

        self.__device_path = device_path
        self.__speed = speed
        self.__read_timeout = read_timeout

    def has_device(self) -> bool:
        return os.path.exists(self.__device_path)

    @contextlib.contextmanager
    def connected(self) -> Generator[_SerialPhyConnection, None, None]:  # type: ignore
        with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty:
            yield _SerialPhyConnection(tty)


# =====
class Plugin(BaseMcuHid):
    def __init__(self, **kwargs: Any) -> None:
        phy_kwargs: Dict = {
            (option.unpack_as or key): kwargs.pop(option.unpack_as or key)
            for (key, option) in self.__get_phy_options().items()
        }
        super().__init__(phy=_SerialPhy(**phy_kwargs), **kwargs)

    @classmethod
    def get_plugin_options(cls) -> Dict:
        return {
            **cls.__get_phy_options(),
            **BaseMcuHid.get_plugin_options(),
        }

    @classmethod
    def __get_phy_options(cls) -> Dict:
        return {
            "device":       Option("",     type=valid_abs_path, unpack_as="device_path"),
            "speed":        Option(115200, type=valid_tty_speed),
            "read_timeout": Option(2.0,    type=valid_float_f01),
        }