diff options
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/ugpio/pwm.py | 56 |
1 files changed, 30 insertions, 26 deletions
diff --git a/kvmd/plugins/ugpio/pwm.py b/kvmd/plugins/ugpio/pwm.py index 90e7a50d..92989049 100644 --- a/kvmd/plugins/ugpio/pwm.py +++ b/kvmd/plugins/ugpio/pwm.py @@ -20,12 +20,13 @@ # # # ========================================================================== # -from periphery import PWM from typing import Dict from typing import Optional from typing import Set +from periphery import PWM + from ...logging import get_logger from ... import tools @@ -42,36 +43,34 @@ from . import BaseUserGpioDriver # ===== class Plugin(BaseUserGpioDriver): - def __init__( # pylint: disable=super-init-not-called self, instance_name: str, notifier: aiotools.AioNotifier, - pwm_chip: int, - pwm_period: int, + chip: int, + period: int, duty_cycle_push: int, duty_cycle_release: int, ) -> None: super().__init__(instance_name, notifier) - self.__pwm_chip = pwm_chip - self.__pwm_period = pwm_period + self.__chip = chip + self.__period = period self.__duty_cycle_push = duty_cycle_push self.__duty_cycle_release = duty_cycle_release self.__channels: Dict[int, Optional[bool]] = {} - - self.__channel_pwm: Dict[int, PWM] = {} + self.__pwms: Dict[int, PWM] = {} @classmethod def get_plugin_options(cls) -> Dict: return { - "pwm_chip": Option(0, type=valid_int_f0), - "pwm_period": Option(20000000, type=valid_int_f0), - "duty_cycle_push": Option(1500000, type=valid_int_f0), - "duty_cycle_release": Option(1000000, type=valid_int_f0), + "chip": Option(0, type=valid_int_f0), + "period": Option(20000000, type=valid_int_f0), + "duty_cycle_push": Option(1500000, type=valid_int_f0), + "duty_cycle_release": Option(1000000, type=valid_int_f0), } @classmethod @@ -86,40 +85,45 @@ class Plugin(BaseUserGpioDriver): def prepare(self) -> None: logger = get_logger(0) - for (pin, initial) in self.__channels.items(): try: - logger.info("Probing pwm chip %d channel %d ...", self.__pwm_chip, pin) - pwm = PWM(self.__pwm_chip, pin) - self.__channel_pwm[pin] = pwm - pwm.period_ns = self.__pwm_period - pwm.duty_cycle_ns = self.__duty_cycle_push if initial else self.__duty_cycle_release + logger.info("Probing pwm chip %d channel %d ...", self.__chip, pin) + pwm = PWM(self.__chip, pin) + self.__pwms[pin] = pwm + pwm.period_ns = self.__period + pwm.duty_cycle_ns = self.__get_duty_cycle(bool(initial)) pwm.enable() - except Exception as err: - logger.error("Can't get pwm chip %d channel %d: %s", - self.__pwm_chip, pin, tools.efmt(err)) + logger.error("Can't get PWM chip %d channel %d: %s", + self.__chip, pin, tools.efmt(err)) async def run(self) -> None: await aiotools.wait_infinite() async def cleanup(self) -> None: - for (pin, _) in self.__channels.items(): - self.__channel_pwm[pin].disable() - self.__channel_pwm[pin].close() + for (pin, pwm) in self.__pwms.items(): + try: + pwm.disable() + pwm.close() + except Exception as err: + get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s", + self.__chip, pin, tools.efmt(err)) async def read(self, pin: int) -> bool: try: - return self.__channel_pwm[pin].duty_cycle_ns == self.__duty_cycle_push + return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push) except Exception: raise GpioDriverOfflineError(self) async def write(self, pin: int, state: bool) -> None: try: - self.__channel_pwm[pin].duty_cycle_ns = self.__duty_cycle_push if state else self.__duty_cycle_release + self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state) except Exception: raise GpioDriverOfflineError(self) + def __get_duty_cycle(self, state: bool) -> int: + return (self.__duty_cycle_push if state else self.__duty_cycle_release) + def __str__(self) -> str: return f"PWM({self._instance_name})" |