summaryrefslogtreecommitdiff
path: root/kvmd/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins')
-rw-r--r--kvmd/plugins/ugpio/pwm.py56
1 files changed, 30 insertions, 26 deletions
diff --git a/kvmd/plugins/ugpio/pwm.py b/kvmd/plugins/ugpio/pwm.py
index 90e7a50d..92989049 100644
--- a/kvmd/plugins/ugpio/pwm.py
+++ b/kvmd/plugins/ugpio/pwm.py
@@ -20,12 +20,13 @@
# #
# ========================================================================== #
-from periphery import PWM
from typing import Dict
from typing import Optional
from typing import Set
+from periphery import PWM
+
from ...logging import get_logger
from ... import tools
@@ -42,36 +43,34 @@ from . import BaseUserGpioDriver
# =====
class Plugin(BaseUserGpioDriver):
-
def __init__( # pylint: disable=super-init-not-called
self,
instance_name: str,
notifier: aiotools.AioNotifier,
- pwm_chip: int,
- pwm_period: int,
+ chip: int,
+ period: int,
duty_cycle_push: int,
duty_cycle_release: int,
) -> None:
super().__init__(instance_name, notifier)
- self.__pwm_chip = pwm_chip
- self.__pwm_period = pwm_period
+ self.__chip = chip
+ self.__period = period
self.__duty_cycle_push = duty_cycle_push
self.__duty_cycle_release = duty_cycle_release
self.__channels: Dict[int, Optional[bool]] = {}
-
- self.__channel_pwm: Dict[int, PWM] = {}
+ self.__pwms: Dict[int, PWM] = {}
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "pwm_chip": Option(0, type=valid_int_f0),
- "pwm_period": Option(20000000, type=valid_int_f0),
- "duty_cycle_push": Option(1500000, type=valid_int_f0),
- "duty_cycle_release": Option(1000000, type=valid_int_f0),
+ "chip": Option(0, type=valid_int_f0),
+ "period": Option(20000000, type=valid_int_f0),
+ "duty_cycle_push": Option(1500000, type=valid_int_f0),
+ "duty_cycle_release": Option(1000000, type=valid_int_f0),
}
@classmethod
@@ -86,40 +85,45 @@ class Plugin(BaseUserGpioDriver):
def prepare(self) -> None:
logger = get_logger(0)
-
for (pin, initial) in self.__channels.items():
try:
- logger.info("Probing pwm chip %d channel %d ...", self.__pwm_chip, pin)
- pwm = PWM(self.__pwm_chip, pin)
- self.__channel_pwm[pin] = pwm
- pwm.period_ns = self.__pwm_period
- pwm.duty_cycle_ns = self.__duty_cycle_push if initial else self.__duty_cycle_release
+ logger.info("Probing pwm chip %d channel %d ...", self.__chip, pin)
+ pwm = PWM(self.__chip, pin)
+ self.__pwms[pin] = pwm
+ pwm.period_ns = self.__period
+ pwm.duty_cycle_ns = self.__get_duty_cycle(bool(initial))
pwm.enable()
-
except Exception as err:
- logger.error("Can't get pwm chip %d channel %d: %s",
- self.__pwm_chip, pin, tools.efmt(err))
+ logger.error("Can't get PWM chip %d channel %d: %s",
+ self.__chip, pin, tools.efmt(err))
async def run(self) -> None:
await aiotools.wait_infinite()
async def cleanup(self) -> None:
- for (pin, _) in self.__channels.items():
- self.__channel_pwm[pin].disable()
- self.__channel_pwm[pin].close()
+ for (pin, pwm) in self.__pwms.items():
+ try:
+ pwm.disable()
+ pwm.close()
+ except Exception as err:
+ get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s",
+ self.__chip, pin, tools.efmt(err))
async def read(self, pin: int) -> bool:
try:
- return self.__channel_pwm[pin].duty_cycle_ns == self.__duty_cycle_push
+ return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push)
except Exception:
raise GpioDriverOfflineError(self)
async def write(self, pin: int, state: bool) -> None:
try:
- self.__channel_pwm[pin].duty_cycle_ns = self.__duty_cycle_push if state else self.__duty_cycle_release
+ self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state)
except Exception:
raise GpioDriverOfflineError(self)
+ def __get_duty_cycle(self, state: bool) -> int:
+ return (self.__duty_cycle_push if state else self.__duty_cycle_release)
+
def __str__(self) -> str:
return f"PWM({self._instance_name})"