diff options
author | Devaev Maxim <[email protected]> | 2020-10-28 21:19:19 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-11-11 22:24:25 +0300 |
commit | dc0340583ecd2f76939edfc85cf69417743c088d (patch) | |
tree | 74dc5f112363b8fb9400f1e79f0fd36b56ec9284 /kvmd/plugins | |
parent | c27b8909dc425b4c06a12264de77877419a13497 (diff) |
splitting serial
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/hid/serial/__init__.py (renamed from kvmd/plugins/hid/serial.py) | 77 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial/gpio.py | 71 |
2 files changed, 89 insertions, 59 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial/__init__.py index a45cd89a..2b1c0af6 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial/__init__.py @@ -33,33 +33,31 @@ from typing import List from typing import Dict from typing import Iterable from typing import AsyncGenerator -from typing import Optional -import gpiod import serial -from ...logging import get_logger +from ....logging import get_logger -from ...keyboard.mappings import KEYMAP +from ....keyboard.mappings import KEYMAP -from ... import env -from ... import tools -from ... import aiotools -from ... import aiomulti -from ... import aioproc -from ... import aiogp +from .... import tools +from .... import aiotools +from .... import aiomulti +from .... import aioproc -from ...yamlconf import Option +from ....yamlconf import Option -from ...validators.basic import valid_bool -from ...validators.basic import valid_int_f0 -from ...validators.basic import valid_int_f1 -from ...validators.basic import valid_float_f01 -from ...validators.os import valid_abs_path -from ...validators.hw import valid_tty_speed -from ...validators.hw import valid_gpio_pin_optional +from ....validators.basic import valid_bool +from ....validators.basic import valid_int_f0 +from ....validators.basic import valid_int_f1 +from ....validators.basic import valid_float_f01 +from ....validators.os import valid_abs_path +from ....validators.hw import valid_tty_speed +from ....validators.hw import valid_gpio_pin_optional -from . import BaseHid +from .. import BaseHid + +from .gpio import Gpio # ===== @@ -156,45 +154,6 @@ class _MouseWheelEvent(_BaseEvent): return struct.pack(">Bxbxx", 0x14, self.delta_y) -class _Gpio: - def __init__(self, reset_pin: int, reset_delay: float) -> None: - self.__reset_pin = reset_pin - self.__reset_delay = reset_delay - - self.__chip: Optional[gpiod.Chip] = None - self.__reset_line: Optional[gpiod.Line] = None - self.__reset_wip = False - - def open(self) -> None: - if self.__reset_pin >= 0: - assert self.__chip is None - assert self.__reset_line is None - self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH) - self.__reset_line = self.__chip.get_line(self.__reset_pin) - self.__reset_line.request("kvmd::hid-serial::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) - - def close(self) -> None: - if self.__chip: - try: - self.__chip.close() - except Exception: - pass - - @aiotools.atomic - async def reset(self) -> None: - if self.__reset_pin >= 0: - assert self.__reset_line - if not self.__reset_wip: - self.__reset_wip = True - try: - await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) - finally: - self.__reset_wip = False - get_logger(0).info("Reset HID performed") - else: - get_logger(0).info("Another reset HID in progress") - - # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called @@ -223,7 +182,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__errors_threshold = errors_threshold self.__noop = noop - self.__gpio = _Gpio(reset_pin, reset_delay) + self.__gpio = Gpio(reset_pin, reset_delay) self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue() diff --git a/kvmd/plugins/hid/serial/gpio.py b/kvmd/plugins/hid/serial/gpio.py new file mode 100644 index 00000000..a3e4018b --- /dev/null +++ b/kvmd/plugins/hid/serial/gpio.py @@ -0,0 +1,71 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Optional + +import gpiod + +from ....logging import get_logger + +from .... import env +from .... import aiotools +from .... import aiogp + + +# ===== +class Gpio: + def __init__(self, reset_pin: int, reset_delay: float) -> None: + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__reset_line: Optional[gpiod.Line] = None + self.__reset_wip = False + + def open(self) -> None: + if self.__reset_pin >= 0: + assert self.__chip is None + assert self.__reset_line is None + self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH) + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd::hid-serial::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + @aiotools.atomic + async def reset(self) -> None: + if self.__reset_pin >= 0: + assert self.__reset_line + if not self.__reset_wip: + self.__reset_wip = True + try: + await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) + finally: + self.__reset_wip = False + get_logger(0).info("Reset HID performed") + else: + get_logger(0).info("Another reset HID in progress") |