diff options
author | Devaev Maxim <[email protected]> | 2019-09-11 20:54:27 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-09-11 20:54:27 +0300 |
commit | ca2eabc01f7294bd6fe35c734580ada91a2230a1 (patch) | |
tree | b65aa5e2e61d0708a0eebafff298bfae70cc2d26 /kvmd | |
parent | 2535892723e275a3e31aa318edc788869c643fd0 (diff) |
atx plugin
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/__init__.py | 16 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 6 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 7 | ||||
-rw-r--r-- | kvmd/plugins/atx/__init__.py | 82 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py (renamed from kvmd/apps/kvmd/atx.py) | 98 | ||||
-rw-r--r-- | kvmd/plugins/atx/none.py | 78 |
7 files changed, 210 insertions, 81 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 7b8f2ef2..fd4ddec7 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -38,6 +38,7 @@ import pygments.formatters from ..plugins import UnknownPluginError from ..plugins.auth import get_auth_service_class from ..plugins.hid import get_hid_class +from ..plugins.atx import get_atx_class from ..yamlconf import ConfigError from ..yamlconf import make_config @@ -117,6 +118,7 @@ def _init_config(config_path: str, sections: List[str], override_options: List[s scheme["kvmd"]["auth"]["external"].update(get_auth_service_class(config.kvmd.auth.external.type).get_plugin_options()) scheme["kvmd"]["hid"].update(get_hid_class(config.kvmd.hid.type).get_plugin_options()) + scheme["kvmd"]["atx"].update(get_atx_class(config.kvmd.atx.type).get_plugin_options()) config = make_config(raw_config, scheme) @@ -183,19 +185,7 @@ def _get_config_scheme(sections: List[str]) -> Dict: }, "atx": { - "enabled": Option(True, type=valid_bool), - - "power_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "hdd_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "power_led_inverted": Option(True, type=valid_bool), - "hdd_led_inverted": Option(True, type=valid_bool), - - "power_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "reset_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "click_delay": Option(0.1, type=valid_float_f01), - "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), + "type": Option("gpio"), }, "msd": { diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index f25e679a..680f080b 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -51,8 +51,10 @@ def main(argv: Optional[List[str]]=None) -> None: *([ ("hid_reset_pin", config.hid.reset_pin, True), ] if config.hid.type == "tty" else []), - ("atx_power_switch_pin", config.atx.power_switch_pin, config.atx.enabled), - ("atx_reset_switch_pin", config.atx.reset_switch_pin, config.atx.enabled), + *([ + ("atx_power_switch_pin", config.atx.power_switch_pin, True), + ("atx_reset_switch_pin", config.atx.reset_switch_pin, True), + ] if config.atx.type == "gpio" else []), ("msd_target_pin", config.msd.target_pin, config.msd.enabled), ("msd_reset_pin", config.msd.reset_pin, config.msd.enabled), ("streamer_cap_pin", config.streamer.cap_pin, True), diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index f9e46b84..bc26e442 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -28,13 +28,13 @@ from ...logging import get_logger from ... import gpio from ...plugins.hid import get_hid_class +from ...plugins.atx import get_atx_class from .. import init from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .atx import Atx from .msd import MassStorageDevice from .streamer import Streamer from .server import Server @@ -63,7 +63,7 @@ def main(argv: Optional[List[str]]=None) -> None: log_reader=LogReader(), hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])), - atx=Atx(**config.atx._unpack()), + atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), msd=MassStorageDevice(**config.msd._unpack()), streamer=Streamer(**config.streamer._unpack()), ).run(**config.server._unpack()) diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 1c8461ae..21e30f0f 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -46,6 +46,9 @@ from ...aioregion import RegionIsBusyError from ...plugins.hid import BaseHid +from ...plugins.atx import AtxOperationError +from ...plugins.atx import BaseAtx + from ...validators import ValidatorError from ...validators.basic import valid_bool @@ -72,8 +75,6 @@ from ... import __version__ from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .atx import AtxOperationError -from .atx import Atx from .msd import MsdOperationError from .msd import MassStorageDevice from .streamer import Streamer @@ -232,7 +233,7 @@ class Server: # pylint: disable=too-many-instance-attributes log_reader: LogReader, hid: BaseHid, - atx: Atx, + atx: BaseAtx, msd: MassStorageDevice, streamer: Streamer, ) -> None: diff --git a/kvmd/plugins/atx/__init__.py b/kvmd/plugins/atx/__init__.py new file mode 100644 index 00000000..77091c07 --- /dev/null +++ b/kvmd/plugins/atx/__init__.py @@ -0,0 +1,82 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Dict +from typing import AsyncGenerator +from typing import Type + +from ... import aioregion + +from .. import BasePlugin +from .. import get_plugin_class + + +# ===== +class AtxError(Exception): + pass + + +class AtxOperationError(AtxError): + pass + + +class AtxIsBusyError(AtxOperationError, aioregion.RegionIsBusyError): + pass + + +# ===== +class BaseAtx(BasePlugin): + def get_state(self) -> Dict: + raise NotImplementedError + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + yield {} + raise NotImplementedError + + async def power_on(self) -> bool: + raise NotImplementedError + + async def power_off(self) -> bool: + raise NotImplementedError + + async def power_off_hard(self) -> bool: + raise NotImplementedError + + async def power_reset_hard(self) -> bool: + raise NotImplementedError + + async def click_power(self) -> None: + raise NotImplementedError + + async def click_power_long(self) -> None: + raise NotImplementedError + + async def click_reset(self) -> None: + raise NotImplementedError + + async def cleanup(self) -> None: + pass + + +# ===== +def get_atx_class(name: str) -> Type[BaseAtx]: + return get_plugin_class("atx", (name or "none")) # type: ignore diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/plugins/atx/gpio.py index 9310c071..a7089a55 100644 --- a/kvmd/apps/kvmd/atx.py +++ b/kvmd/plugins/atx/gpio.py @@ -24,9 +24,7 @@ import asyncio import operator from typing import Dict -from typing import Callable from typing import AsyncGenerator -from typing import Any from ...logging import get_logger @@ -34,37 +32,22 @@ from ... import aiotools from ... import aioregion from ... import gpio +from ...yamlconf import Option -# ===== -class AtxError(Exception): - pass - - -class AtxOperationError(AtxError): - pass - +from ...validators.basic import valid_bool +from ...validators.basic import valid_float_f01 -class AtxDisabledError(AtxOperationError): - def __init__(self) -> None: - super().__init__("ATX is disabled") +from ...validators.hw import valid_gpio_pin -class AtxIsBusyError(AtxOperationError, aioregion.RegionIsBusyError): - pass +from . import AtxIsBusyError +from . import BaseAtx -def _atx_working(method: Callable) -> Callable: - async def wrapper(self: "Atx", *args: Any, **kwargs: Any) -> Any: - if not self._enabled: # pylint: disable=protected-access - raise AtxDisabledError() - return (await method(self, *args, **kwargs)) - return wrapper - - -class Atx: # pylint: disable=too-many-instance-attributes - def __init__( # pylint: disable=too-many-arguments +# ===== +class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, - enabled: bool, power_led_pin: int, hdd_led_pin: int, @@ -79,18 +62,10 @@ class Atx: # pylint: disable=too-many-instance-attributes state_poll: float, ) -> None: - self._enabled = enabled - - if self._enabled: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin) - else: - self.__power_led_pin = -1 - self.__hdd_led_pin = -1 - self.__power_switch_pin = -1 - self.__reset_switch_pin = -1 + self.__power_led_pin = gpio.set_input(power_led_pin) + self.__hdd_led_pin = gpio.set_input(hdd_led_pin) + self.__power_switch_pin = gpio.set_output(power_switch_pin) + self.__reset_switch_pin = gpio.set_output(reset_switch_pin) self.__power_led_inverted = power_led_inverted self.__hdd_led_inverted = hdd_led_inverted @@ -102,32 +77,40 @@ class Atx: # pylint: disable=too-many-instance-attributes self.__region = aioregion.AioExclusiveRegion(AtxIsBusyError) + @classmethod + def get_plugin_options(cls) -> Dict[str, Option]: + return { + "power_led_pin": Option(-1, type=valid_gpio_pin), + "hdd_led_pin": Option(-1, type=valid_gpio_pin), + "power_led_inverted": Option(True, type=valid_bool), + "hdd_led_inverted": Option(True, type=valid_bool), + + "power_switch_pin": Option(-1, type=valid_gpio_pin), + "reset_switch_pin": Option(-1, type=valid_gpio_pin), + "click_delay": Option(0.1, type=valid_float_f01), + "long_click_delay": Option(5.5, type=valid_float_f01), + + "state_poll": Option(0.1, type=valid_float_f01), + } + def get_state(self) -> Dict: - if self._enabled: - power_led_state = operator.xor(self.__power_led_inverted, gpio.read(self.__power_led_pin)) - hdd_led_state = operator.xor(self.__hdd_led_inverted, gpio.read(self.__hdd_led_pin)) - else: - power_led_state = hdd_led_state = False return { - "enabled": self._enabled, + "enabled": True, "busy": self.__region.is_busy(), "leds": { - "power": power_led_state, - "hdd": hdd_led_state, + "power": operator.xor(self.__power_led_inverted, gpio.read(self.__power_led_pin)), + "hdd": operator.xor(self.__hdd_led_inverted, gpio.read(self.__hdd_led_pin)), }, } async def poll_state(self) -> AsyncGenerator[Dict, None]: prev_state: Dict = {} while True: - if self._enabled: - state = self.get_state() - if state != prev_state: - yield state - prev_state = state - await asyncio.sleep(self.__state_poll) - else: - await asyncio.sleep(60) + state = self.get_state() + if state != prev_state: + yield state + prev_state = state + await asyncio.sleep(self.__state_poll) async def cleanup(self) -> None: for (name, pin) in [ @@ -141,28 +124,24 @@ class Atx: # pylint: disable=too-many-instance-attributes # ===== - @_atx_working async def power_on(self) -> bool: if not self.get_state()["leds"]["power"]: await self.click_power() return True return False - @_atx_working async def power_off(self) -> bool: if self.get_state()["leds"]["power"]: await self.click_power() return True return False - @_atx_working async def power_off_hard(self) -> bool: if self.get_state()["leds"]["power"]: await self.click_power_long() return True return False - @_atx_working async def power_reset_hard(self) -> bool: if self.get_state()["leds"]["power"]: await self.click_reset() @@ -171,15 +150,12 @@ class Atx: # pylint: disable=too-many-instance-attributes # ===== - @_atx_working async def click_power(self) -> None: await self.__click("power", self.__power_switch_pin, self.__click_delay) - @_atx_working async def click_power_long(self) -> None: await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay) - @_atx_working async def click_reset(self) -> None: await self.__click("reset", self.__reset_switch_pin, self.__click_delay) diff --git a/kvmd/plugins/atx/none.py b/kvmd/plugins/atx/none.py new file mode 100644 index 00000000..09fad2f8 --- /dev/null +++ b/kvmd/plugins/atx/none.py @@ -0,0 +1,78 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio + +from typing import Dict +from typing import AsyncGenerator + +from . import AtxOperationError +from . import BaseAtx + + +# ===== +class AtxDisabledError(AtxOperationError): + def __init__(self) -> None: + super().__init__("ATX is disabled") + + +# ===== +class Plugin(BaseAtx): + def get_state(self) -> Dict: + return { + "enabled": False, + "busy": False, + "leds": { + "power": False, + "hdd": False, + }, + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + while True: + yield self.get_state() + await asyncio.sleep(60) + + # ===== + + async def power_on(self) -> bool: + raise AtxDisabledError() + + async def power_off(self) -> bool: + raise AtxDisabledError() + + async def power_off_hard(self) -> bool: + raise AtxDisabledError() + + async def power_reset_hard(self) -> bool: + raise AtxDisabledError() + + # ===== + + async def click_power(self) -> None: + raise AtxDisabledError() + + async def click_power_long(self) -> None: + raise AtxDisabledError() + + async def click_reset(self) -> None: + raise AtxDisabledError() |