diff options
author | Devaev Maxim <[email protected]> | 2018-06-28 06:23:19 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2018-06-28 06:23:19 +0300 |
commit | e8595665c093767e7afae1158cbdda74e288e351 (patch) | |
tree | d1546b4a44b76e534c8c4be1e4e985d964ef324f | |
parent | cac56cd92f91c631ccf1f58d459e9c203e611de2 (diff) |
gpio wrapper
-rw-r--r-- | kvmd/kvmd/__init__.py | 16 | ||||
-rw-r--r-- | kvmd/kvmd/atx.py | 25 | ||||
-rw-r--r-- | kvmd/kvmd/extra/cleanup/__init__.py | 21 | ||||
-rw-r--r-- | kvmd/kvmd/gpio.py | 38 | ||||
-rw-r--r-- | kvmd/kvmd/streamer.py | 15 |
5 files changed, 67 insertions, 48 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py index d303747c..4ef2c8a9 100644 --- a/kvmd/kvmd/__init__.py +++ b/kvmd/kvmd/__init__.py @@ -8,8 +8,6 @@ from typing import Set from typing import Callable from typing import Optional -from RPi import GPIO - import aiohttp from .application import init @@ -17,6 +15,8 @@ from .application import init from .atx import Atx from .streamer import Streamer +from . import gpio + # ===== _logger = logging.getLogger(__name__) @@ -42,8 +42,6 @@ class _Application: self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() self.__sockets_lock = asyncio.Lock() - GPIO.setmode(GPIO.BCM) - self.__atx = Atx( power_led=self.__config["atx"]["leds"]["pinout"]["power"], hdd_led=self.__config["atx"]["leds"]["pinout"]["hdd"], @@ -113,11 +111,6 @@ class _Application: if self.__streamer.is_running(): await self.__streamer.stop() - _logger.info("Cleaning up GPIO ...") - GPIO.cleanup() - - _logger.info("Bye-bye") - @_system_task async def __stream_controller(self) -> None: prev = 0 @@ -189,4 +182,7 @@ class _Application: def main() -> None: - _Application(init()).run() + config = init() + with gpio.bcm(): + _Application(config).run() + _logger.info("Bye-bye") diff --git a/kvmd/kvmd/atx.py b/kvmd/kvmd/atx.py index a3d07fc4..a8c29cb0 100644 --- a/kvmd/kvmd/atx.py +++ b/kvmd/kvmd/atx.py @@ -3,7 +3,7 @@ import logging from typing import Tuple -from RPi import GPIO +from . import gpio # ===== @@ -21,29 +21,20 @@ class Atx: long_click_delay: float, ) -> None: - self.__power_led = self.__set_input_pin(power_led) - self.__hdd_led = self.__set_input_pin(hdd_led) + self.__power_led = gpio.set_input(power_led) + self.__hdd_led = gpio.set_input(hdd_led) - self.__power_switch = self.__set_output_pin(power_switch) - self.__reset_switch = self.__set_output_pin(reset_switch) + self.__power_switch = gpio.set_output_zeroed(power_switch) + self.__reset_switch = gpio.set_output_zeroed(reset_switch) self.__click_delay = click_delay self.__long_click_delay = long_click_delay self.__lock = asyncio.Lock() - def __set_input_pin(self, pin: int) -> int: - GPIO.setup(pin, GPIO.IN) - return pin - - def __set_output_pin(self, pin: int) -> int: - GPIO.setup(pin, GPIO.OUT) - GPIO.output(pin, False) - return pin - def get_leds(self) -> Tuple[bool, bool]: return ( - not GPIO.input(self.__power_led), - not GPIO.input(self.__hdd_led), + not gpio.read(self.__power_led), + not gpio.read(self.__hdd_led), ) async def click_power(self) -> None: @@ -62,7 +53,7 @@ class Atx: if not self.__lock.locked(): async with self.__lock: for flag in (True, False): - GPIO.output(pin, flag) + gpio.write(pin, flag) await asyncio.sleep(delay) return True return False diff --git a/kvmd/kvmd/extra/cleanup/__init__.py b/kvmd/kvmd/extra/cleanup/__init__.py index d8ae866c..b7329e90 100644 --- a/kvmd/kvmd/extra/cleanup/__init__.py +++ b/kvmd/kvmd/extra/cleanup/__init__.py @@ -1,9 +1,9 @@ import logging -from RPi import GPIO - from ...application import init +from ... import gpio + # ===== _logger = logging.getLogger(__name__) @@ -12,12 +12,11 @@ _logger = logging.getLogger(__name__) def main() -> None: config = init() _logger.info("Cleaning up ...") - GPIO.setmode(GPIO.BCM) - for (key, pin) in [ - *config["atx"]["switches"]["pinout"].items(), - *config["video"]["pinout"].items(), - ]: - _logger.info("Writing value=0 to pin=%d (%s)", pin, key) - GPIO.output(pin, False) - GPIO.cleanup() - _logger.info("Done!") + with gpio.bcm(): + for (key, pin) in [ + *config["atx"]["switches"]["pinout"].items(), + *config["video"]["pinout"].items(), + ]: + _logger.info("Writing value=0 to pin=%d (%s)", pin, key) + gpio.write(pin, False) + _logger.info("Bye-bye") diff --git a/kvmd/kvmd/gpio.py b/kvmd/kvmd/gpio.py new file mode 100644 index 00000000..673ac373 --- /dev/null +++ b/kvmd/kvmd/gpio.py @@ -0,0 +1,38 @@ +import contextlib +import logging + +from typing import Generator + +from RPi import GPIO + + +# ===== +_logger = logging.getLogger(__name__) + + +def bcm() -> Generator[None, None, None]: + GPIO.setmode(GPIO.BCM) + _logger.info("Configured GPIO mode as BCM") + yield + GPIO.cleanup() + _logger.info("GPIO cleaned") + + +def set_output_zeroed(pin: int) -> int: + GPIO.setup(pin, GPIO.OUT) + GPIO.output(pin, False) + return pin + + +def set_input(pin: int) -> int: + GPIO.setup(pin, GPIO.IN) + return pin + + +def read(pin: int) -> bool: + return bool(GPIO.input(pin)) + + +def write(pin: int, flag: bool) -> None: + GPIO.output(pin, flag) diff --git a/kvmd/kvmd/streamer.py b/kvmd/kvmd/streamer.py index 8e542e5f..8581e4ee 100644 --- a/kvmd/kvmd/streamer.py +++ b/kvmd/kvmd/streamer.py @@ -5,7 +5,7 @@ import logging from typing import Dict from typing import Optional -from RPi import GPIO +from . import gpio # ===== @@ -22,8 +22,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes loop: asyncio.AbstractEventLoop, ) -> None: - self.__cap_power = self.__set_output_pin(cap_power) - self.__vga_power = self.__set_output_pin(vga_power) + self.__cap_power = gpio.set_output_zeroed(cap_power) + self.__vga_power = gpio.set_output_zeroed(vga_power) self.__sync_delay = sync_delay self.__cmd = ( @@ -36,11 +36,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__proc_task: Optional[asyncio.Task] = None - def __set_output_pin(self, pin: int) -> int: - GPIO.setup(pin, GPIO.OUT) - GPIO.output(pin, False) - return pin - async def start(self) -> None: assert not self.__proc_task _logger.info("Starting mjpg_streamer ...") @@ -60,10 +55,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes async def __set_hw_enabled(self, enabled: bool) -> None: # XXX: This sequence is very important for enable - GPIO.output(self.__cap_power, enabled) + gpio.write(self.__cap_power, enabled) if enabled: await asyncio.sleep(self.__sync_delay) - GPIO.output(self.__vga_power, enabled) + gpio.write(self.__vga_power, enabled) await asyncio.sleep(self.__sync_delay) async def __process(self) -> None: |