diff options
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/__init__.py | 16 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 6 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/atx.py | 205 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 7 |
5 files changed, 13 insertions, 225 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 7b8f2ef2..fd4ddec7 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -38,6 +38,7 @@ import pygments.formatters from ..plugins import UnknownPluginError from ..plugins.auth import get_auth_service_class from ..plugins.hid import get_hid_class +from ..plugins.atx import get_atx_class from ..yamlconf import ConfigError from ..yamlconf import make_config @@ -117,6 +118,7 @@ def _init_config(config_path: str, sections: List[str], override_options: List[s scheme["kvmd"]["auth"]["external"].update(get_auth_service_class(config.kvmd.auth.external.type).get_plugin_options()) scheme["kvmd"]["hid"].update(get_hid_class(config.kvmd.hid.type).get_plugin_options()) + scheme["kvmd"]["atx"].update(get_atx_class(config.kvmd.atx.type).get_plugin_options()) config = make_config(raw_config, scheme) @@ -183,19 +185,7 @@ def _get_config_scheme(sections: List[str]) -> Dict: }, "atx": { - "enabled": Option(True, type=valid_bool), - - "power_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "hdd_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "power_led_inverted": Option(True, type=valid_bool), - "hdd_led_inverted": Option(True, type=valid_bool), - - "power_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "reset_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "click_delay": Option(0.1, type=valid_float_f01), - "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), + "type": Option("gpio"), }, "msd": { diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index f25e679a..680f080b 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -51,8 +51,10 @@ def main(argv: Optional[List[str]]=None) -> None: *([ ("hid_reset_pin", config.hid.reset_pin, True), ] if config.hid.type == "tty" else []), - ("atx_power_switch_pin", config.atx.power_switch_pin, config.atx.enabled), - ("atx_reset_switch_pin", config.atx.reset_switch_pin, config.atx.enabled), + *([ + ("atx_power_switch_pin", config.atx.power_switch_pin, True), + ("atx_reset_switch_pin", config.atx.reset_switch_pin, True), + ] if config.atx.type == "gpio" else []), ("msd_target_pin", config.msd.target_pin, config.msd.enabled), ("msd_reset_pin", config.msd.reset_pin, config.msd.enabled), ("streamer_cap_pin", config.streamer.cap_pin, True), diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index f9e46b84..bc26e442 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -28,13 +28,13 @@ from ...logging import get_logger from ... import gpio from ...plugins.hid import get_hid_class +from ...plugins.atx import get_atx_class from .. import init from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .atx import Atx from .msd import MassStorageDevice from .streamer import Streamer from .server import Server @@ -63,7 +63,7 @@ def main(argv: Optional[List[str]]=None) -> None: log_reader=LogReader(), hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])), - atx=Atx(**config.atx._unpack()), + atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), msd=MassStorageDevice(**config.msd._unpack()), streamer=Streamer(**config.streamer._unpack()), ).run(**config.server._unpack()) diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py deleted file mode 100644 index 9310c071..00000000 --- a/kvmd/apps/kvmd/atx.py +++ /dev/null @@ -1,205 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import asyncio -import operator - -from typing import Dict -from typing import Callable -from typing import AsyncGenerator -from typing import Any - -from ...logging import get_logger - -from ... import aiotools -from ... import aioregion -from ... import gpio - - -# ===== -class AtxError(Exception): - pass - - -class AtxOperationError(AtxError): - pass - - -class AtxDisabledError(AtxOperationError): - def __init__(self) -> None: - super().__init__("ATX is disabled") - - -class AtxIsBusyError(AtxOperationError, aioregion.RegionIsBusyError): - pass - - -def _atx_working(method: Callable) -> Callable: - async def wrapper(self: "Atx", *args: Any, **kwargs: Any) -> Any: - if not self._enabled: # pylint: disable=protected-access - raise AtxDisabledError() - return (await method(self, *args, **kwargs)) - return wrapper - - -class Atx: # pylint: disable=too-many-instance-attributes - def __init__( # pylint: disable=too-many-arguments - self, - enabled: bool, - - power_led_pin: int, - hdd_led_pin: int, - power_led_inverted: bool, - hdd_led_inverted: bool, - - power_switch_pin: int, - reset_switch_pin: int, - click_delay: float, - long_click_delay: float, - - state_poll: float, - ) -> None: - - self._enabled = enabled - - if self._enabled: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin) - else: - self.__power_led_pin = -1 - self.__hdd_led_pin = -1 - self.__power_switch_pin = -1 - self.__reset_switch_pin = -1 - - self.__power_led_inverted = power_led_inverted - self.__hdd_led_inverted = hdd_led_inverted - - self.__click_delay = click_delay - self.__long_click_delay = long_click_delay - - self.__state_poll = state_poll - - self.__region = aioregion.AioExclusiveRegion(AtxIsBusyError) - - def get_state(self) -> Dict: - if self._enabled: - power_led_state = operator.xor(self.__power_led_inverted, gpio.read(self.__power_led_pin)) - hdd_led_state = operator.xor(self.__hdd_led_inverted, gpio.read(self.__hdd_led_pin)) - else: - power_led_state = hdd_led_state = False - return { - "enabled": self._enabled, - "busy": self.__region.is_busy(), - "leds": { - "power": power_led_state, - "hdd": hdd_led_state, - }, - } - - async def poll_state(self) -> AsyncGenerator[Dict, None]: - prev_state: Dict = {} - while True: - if self._enabled: - state = self.get_state() - if state != prev_state: - yield state - prev_state = state - await asyncio.sleep(self.__state_poll) - else: - await asyncio.sleep(60) - - async def cleanup(self) -> None: - for (name, pin) in [ - ("power", self.__power_switch_pin), - ("reset", self.__reset_switch_pin), - ]: - try: - gpio.write(pin, False) - except Exception: - get_logger(0).exception("Can't cleanup %s pin %d", name, pin) - - # ===== - - @_atx_working - async def power_on(self) -> bool: - if not self.get_state()["leds"]["power"]: - await self.click_power() - return True - return False - - @_atx_working - async def power_off(self) -> bool: - if self.get_state()["leds"]["power"]: - await self.click_power() - return True - return False - - @_atx_working - async def power_off_hard(self) -> bool: - if self.get_state()["leds"]["power"]: - await self.click_power_long() - return True - return False - - @_atx_working - async def power_reset_hard(self) -> bool: - if self.get_state()["leds"]["power"]: - await self.click_reset() - return True - return False - - # ===== - - @_atx_working - async def click_power(self) -> None: - await self.__click("power", self.__power_switch_pin, self.__click_delay) - - @_atx_working - async def click_power_long(self) -> None: - await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay) - - @_atx_working - async def click_reset(self) -> None: - await self.__click("reset", self.__reset_switch_pin, self.__click_delay) - - # ===== - - @aiotools.atomic - async def __click(self, name: str, pin: int, delay: float) -> None: - with aiotools.unregion_only_on_exception(self.__region): - await self.__inner_click(name, pin, delay) - - @aiotools.tasked - @aiotools.muted("Can't perform ATX click or operation was not completed") - async def __inner_click(self, name: str, pin: int, delay: float) -> None: - try: - gpio.write(pin, True) - await asyncio.sleep(delay) - finally: - try: - gpio.write(pin, False) - await asyncio.sleep(1) - finally: - self.__region.exit() - get_logger(0).info("Clicked ATX button %r", name) diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 1c8461ae..21e30f0f 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -46,6 +46,9 @@ from ...aioregion import RegionIsBusyError from ...plugins.hid import BaseHid +from ...plugins.atx import AtxOperationError +from ...plugins.atx import BaseAtx + from ...validators import ValidatorError from ...validators.basic import valid_bool @@ -72,8 +75,6 @@ from ... import __version__ from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .atx import AtxOperationError -from .atx import Atx from .msd import MsdOperationError from .msd import MassStorageDevice from .streamer import Streamer @@ -232,7 +233,7 @@ class Server: # pylint: disable=too-many-instance-attributes log_reader: LogReader, hid: BaseHid, - atx: Atx, + atx: BaseAtx, msd: MassStorageDevice, streamer: Streamer, ) -> None: |