diff options
Diffstat (limited to 'kvmd/plugins/atx/gpio.py')
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 56 |
1 files changed, 31 insertions, 25 deletions
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index 58ed8a81..476dacb0 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -20,6 +20,8 @@ # ========================================================================== # +import asyncio + from typing import AsyncGenerator import gpiod @@ -74,13 +76,11 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__notifier = aiotools.AioNotifier() self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) - self.__chip: (gpiod.Chip | None) = None - self.__power_switch_line: (gpiod.Line | None) = None - self.__reset_switch_line: (gpiod.Line | None) = None + self.__line_request: (gpiod.LineRequest | None) = None self.__reader = aiogp.AioReader( path=self.__device_path, - consumer="kvmd::atx::leds", + consumer="kvmd::atx", pins={ power_led_pin: aiogp.AioReaderPinParams(power_led_inverted, power_led_debounce), hdd_led_pin: aiogp.AioReaderPinParams(hdd_led_inverted, hdd_led_debounce), @@ -108,17 +108,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes } def sysprep(self) -> None: - assert self.__chip is None - assert self.__power_switch_line is None - assert self.__reset_switch_line is None - - self.__chip = gpiod.Chip(self.__device_path) - - self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin) - self.__power_switch_line.request("kvmd::atx::power_switch", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) - - self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin) - self.__reset_switch_line.request("kvmd::atx::reset_switch", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) + assert self.__line_request is None + self.__line_request = gpiod.request_lines( + self.__device_path, + consumer="kvmd::atx", + config={ + (self.__power_switch_pin, self.__reset_switch_pin): gpiod.LineSettings( + direction=gpiod.line.Direction.OUTPUT, + output_value=gpiod.line.Value(False), + ), + }, + ) async def get_state(self) -> dict: return { @@ -143,9 +143,9 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes await self.__reader.poll() async def cleanup(self) -> None: - if self.__chip: + if self.__line_request: try: - self.__chip.close() + self.__line_request.release() except Exception: pass @@ -170,13 +170,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes # ===== async def click_power(self, wait: bool) -> None: - await self.__click("power", self.__power_switch_line, self.__click_delay, wait) + await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) async def click_power_long(self, wait: bool) -> None: - await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait) + await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) async def click_reset(self, wait: bool) -> None: - await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait) + await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) # ===== @@ -184,17 +184,23 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes return (await self.get_state())["leds"]["power"] @aiotools.atomic_fg - async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None: + async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: if wait: async with self.__region: - await self.__inner_click(name, line, delay) + await self.__inner_click(name, pin, delay) else: await aiotools.run_region_task( f"Can't perform ATX {name} click or operation was not completed", - self.__region, self.__inner_click, name, line, delay, + self.__region, self.__inner_click, name, pin, delay, ) @aiotools.atomic_fg - async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: - await aiogp.pulse(line, delay, 1) + async def __inner_click(self, name: str, pin: int, delay: float) -> None: + assert self.__line_request + try: + self.__line_request.set_value(pin, gpiod.line.Value(True)) + await asyncio.sleep(delay) + finally: + self.__line_request.set_value(pin, gpiod.line.Value(False)) + await asyncio.sleep(1) get_logger(0).info("Clicked ATX button %r", name) |