diff options
Diffstat (limited to 'kvmd/plugins/atx')
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 81 |
1 files changed, 44 insertions, 37 deletions
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index e3225cf0..bd49513c 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -24,11 +24,14 @@ import asyncio from typing import Dict from typing import AsyncGenerator +from typing import Optional + +import gpiod from ...logging import get_logger from ... import aiotools -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -37,7 +40,6 @@ from ...validators.basic import valid_float_f01 from ...validators.hw import valid_gpio_pin - from . import AtxIsBusyError from . import BaseAtx @@ -46,7 +48,6 @@ from . import BaseAtx class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, - power_led_pin: int, hdd_led_pin: int, power_led_inverted: bool, @@ -56,17 +57,12 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes reset_switch_pin: int, click_delay: float, long_click_delay: float, - - state_poll: float, ) -> None: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin, False) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) - - self.__power_led_inverted = power_led_inverted - self.__hdd_led_inverted = hdd_led_inverted + self.__power_led_pin = power_led_pin + self.__hdd_led_pin = hdd_led_pin + self.__power_switch_pin = power_switch_pin + self.__reset_switch_pin = reset_switch_pin self.__click_delay = click_delay self.__long_click_delay = long_click_delay @@ -74,9 +70,15 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__notifier = aiotools.AioNotifier() self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) - self.__reader = gpio.BatchReader( - pins=set([self.__power_led_pin, self.__hdd_led_pin]), - interval=state_poll, + self.__chip: Optional[gpiod.Chip] = None + self.__power_switch_line: Optional[gpiod.Line] = None + self.__reset_switch_line: Optional[gpiod.Line] = None + + self.__reader = aiogp.AioPinsReader( + path="/dev/gpiochip0", + consumer="kvmd/atx-gpio/leds", + pins=[power_led_pin, hdd_led_pin], + inverted=[power_led_inverted, hdd_led_inverted], notifier=self.__notifier, ) @@ -92,17 +94,28 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes "reset_switch_pin": Option(-1, type=valid_gpio_pin), "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), } + def sysprep(self) -> None: + assert self.__chip is None + assert self.__power_switch_line is None + assert self.__reset_switch_line is None + + self.__chip = gpiod.Chip("/dev/gpiochip0") + + self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin) + self.__power_switch_line.request("kvmd/atx-gpio/power_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin) + self.__reset_switch_line.request("kvmd/atx-gpio/reset_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + async def get_state(self) -> Dict: return { "enabled": True, "busy": self.__region.is_busy(), "leds": { - "power": (self.__reader.get(self.__power_led_pin) ^ self.__power_led_inverted), - "hdd": (self.__reader.get(self.__hdd_led_pin) ^ self.__hdd_led_inverted), + "power": self.__reader.get(self.__power_led_pin), + "hdd": self.__reader.get(self.__hdd_led_pin), }, } @@ -119,14 +132,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes await self.__reader.poll() async def cleanup(self) -> None: - for (name, pin) in [ - ("power", self.__power_switch_pin), - ("reset", self.__reset_switch_pin), - ]: - try: - gpio.write(pin, False) - except Exception: - get_logger(0).exception("Can't cleanup %s pin %d", name, pin) + if self.__chip: + self.__chip.close() # ===== @@ -149,13 +156,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes # ===== async def click_power(self, wait: bool) -> None: - await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) + await self.__click("power", self.__power_switch_line, self.__click_delay, wait) async def click_power_long(self, wait: bool) -> None: - await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) + await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait) async def click_reset(self, wait: bool) -> None: - await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) + await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait) # ===== @@ -163,22 +170,22 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes return (await self.get_state())["leds"]["power"] @aiotools.atomic - async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: + async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None: if wait: async with self.__region: - await self.__inner_click(name, pin, delay) + await self.__inner_click(name, line, delay) else: await aiotools.run_region_task( - "Can't perform ATX click or operation was not completed", - self.__region, self.__inner_click, name, pin, delay, + f"Can't perform ATX {name} click or operation was not completed", + self.__region, self.__inner_click, name, line, delay, ) @aiotools.atomic - async def __inner_click(self, name: str, pin: int, delay: float) -> None: + async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: try: - gpio.write(pin, True) + line.set_value(1) await asyncio.sleep(delay) finally: - gpio.write(pin, False) + line.set_value(0) await asyncio.sleep(1) get_logger(0).info("Clicked ATX button %r", name) |