summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-06-28 06:23:19 +0300
committerDevaev Maxim <[email protected]>2018-06-28 06:23:19 +0300
commite8595665c093767e7afae1158cbdda74e288e351 (patch)
treed1546b4a44b76e534c8c4be1e4e985d964ef324f /kvmd
parentcac56cd92f91c631ccf1f58d459e9c203e611de2 (diff)
gpio wrapper
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/kvmd/__init__.py16
-rw-r--r--kvmd/kvmd/atx.py25
-rw-r--r--kvmd/kvmd/extra/cleanup/__init__.py21
-rw-r--r--kvmd/kvmd/gpio.py38
-rw-r--r--kvmd/kvmd/streamer.py15
5 files changed, 67 insertions, 48 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py
index d303747c..4ef2c8a9 100644
--- a/kvmd/kvmd/__init__.py
+++ b/kvmd/kvmd/__init__.py
@@ -8,8 +8,6 @@ from typing import Set
from typing import Callable
from typing import Optional
-from RPi import GPIO
-
import aiohttp
from .application import init
@@ -17,6 +15,8 @@ from .application import init
from .atx import Atx
from .streamer import Streamer
+from . import gpio
+
# =====
_logger = logging.getLogger(__name__)
@@ -42,8 +42,6 @@ class _Application:
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
- GPIO.setmode(GPIO.BCM)
-
self.__atx = Atx(
power_led=self.__config["atx"]["leds"]["pinout"]["power"],
hdd_led=self.__config["atx"]["leds"]["pinout"]["hdd"],
@@ -113,11 +111,6 @@ class _Application:
if self.__streamer.is_running():
await self.__streamer.stop()
- _logger.info("Cleaning up GPIO ...")
- GPIO.cleanup()
-
- _logger.info("Bye-bye")
-
@_system_task
async def __stream_controller(self) -> None:
prev = 0
@@ -189,4 +182,7 @@ class _Application:
def main() -> None:
- _Application(init()).run()
+ config = init()
+ with gpio.bcm():
+ _Application(config).run()
+ _logger.info("Bye-bye")
diff --git a/kvmd/kvmd/atx.py b/kvmd/kvmd/atx.py
index a3d07fc4..a8c29cb0 100644
--- a/kvmd/kvmd/atx.py
+++ b/kvmd/kvmd/atx.py
@@ -3,7 +3,7 @@ import logging
from typing import Tuple
-from RPi import GPIO
+from . import gpio
# =====
@@ -21,29 +21,20 @@ class Atx:
long_click_delay: float,
) -> None:
- self.__power_led = self.__set_input_pin(power_led)
- self.__hdd_led = self.__set_input_pin(hdd_led)
+ self.__power_led = gpio.set_input(power_led)
+ self.__hdd_led = gpio.set_input(hdd_led)
- self.__power_switch = self.__set_output_pin(power_switch)
- self.__reset_switch = self.__set_output_pin(reset_switch)
+ self.__power_switch = gpio.set_output_zeroed(power_switch)
+ self.__reset_switch = gpio.set_output_zeroed(reset_switch)
self.__click_delay = click_delay
self.__long_click_delay = long_click_delay
self.__lock = asyncio.Lock()
- def __set_input_pin(self, pin: int) -> int:
- GPIO.setup(pin, GPIO.IN)
- return pin
-
- def __set_output_pin(self, pin: int) -> int:
- GPIO.setup(pin, GPIO.OUT)
- GPIO.output(pin, False)
- return pin
-
def get_leds(self) -> Tuple[bool, bool]:
return (
- not GPIO.input(self.__power_led),
- not GPIO.input(self.__hdd_led),
+ not gpio.read(self.__power_led),
+ not gpio.read(self.__hdd_led),
)
async def click_power(self) -> None:
@@ -62,7 +53,7 @@ class Atx:
if not self.__lock.locked():
async with self.__lock:
for flag in (True, False):
- GPIO.output(pin, flag)
+ gpio.write(pin, flag)
await asyncio.sleep(delay)
return True
return False
diff --git a/kvmd/kvmd/extra/cleanup/__init__.py b/kvmd/kvmd/extra/cleanup/__init__.py
index d8ae866c..b7329e90 100644
--- a/kvmd/kvmd/extra/cleanup/__init__.py
+++ b/kvmd/kvmd/extra/cleanup/__init__.py
@@ -1,9 +1,9 @@
import logging
-from RPi import GPIO
-
from ...application import init
+from ... import gpio
+
# =====
_logger = logging.getLogger(__name__)
@@ -12,12 +12,11 @@ _logger = logging.getLogger(__name__)
def main() -> None:
config = init()
_logger.info("Cleaning up ...")
- GPIO.setmode(GPIO.BCM)
- for (key, pin) in [
- *config["atx"]["switches"]["pinout"].items(),
- *config["video"]["pinout"].items(),
- ]:
- _logger.info("Writing value=0 to pin=%d (%s)", pin, key)
- GPIO.output(pin, False)
- GPIO.cleanup()
- _logger.info("Done!")
+ with gpio.bcm():
+ for (key, pin) in [
+ *config["atx"]["switches"]["pinout"].items(),
+ *config["video"]["pinout"].items(),
+ ]:
+ _logger.info("Writing value=0 to pin=%d (%s)", pin, key)
+ gpio.write(pin, False)
+ _logger.info("Bye-bye")
diff --git a/kvmd/kvmd/gpio.py b/kvmd/kvmd/gpio.py
new file mode 100644
index 00000000..673ac373
--- /dev/null
+++ b/kvmd/kvmd/gpio.py
@@ -0,0 +1,38 @@
+import contextlib
+import logging
+
+from typing import Generator
+
+from RPi import GPIO
+
+
+# =====
+_logger = logging.getLogger(__name__)
+
+
+def bcm() -> Generator[None, None, None]:
+ GPIO.setmode(GPIO.BCM)
+ _logger.info("Configured GPIO mode as BCM")
+ yield
+ GPIO.cleanup()
+ _logger.info("GPIO cleaned")
+
+
+def set_output_zeroed(pin: int) -> int:
+ GPIO.setup(pin, GPIO.OUT)
+ GPIO.output(pin, False)
+ return pin
+
+
+def set_input(pin: int) -> int:
+ GPIO.setup(pin, GPIO.IN)
+ return pin
+
+
+def read(pin: int) -> bool:
+ return bool(GPIO.input(pin))
+
+
+def write(pin: int, flag: bool) -> None:
+ GPIO.output(pin, flag)
diff --git a/kvmd/kvmd/streamer.py b/kvmd/kvmd/streamer.py
index 8e542e5f..8581e4ee 100644
--- a/kvmd/kvmd/streamer.py
+++ b/kvmd/kvmd/streamer.py
@@ -5,7 +5,7 @@ import logging
from typing import Dict
from typing import Optional
-from RPi import GPIO
+from . import gpio
# =====
@@ -22,8 +22,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
loop: asyncio.AbstractEventLoop,
) -> None:
- self.__cap_power = self.__set_output_pin(cap_power)
- self.__vga_power = self.__set_output_pin(vga_power)
+ self.__cap_power = gpio.set_output_zeroed(cap_power)
+ self.__vga_power = gpio.set_output_zeroed(vga_power)
self.__sync_delay = sync_delay
self.__cmd = (
@@ -36,11 +36,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__proc_task: Optional[asyncio.Task] = None
- def __set_output_pin(self, pin: int) -> int:
- GPIO.setup(pin, GPIO.OUT)
- GPIO.output(pin, False)
- return pin
-
async def start(self) -> None:
assert not self.__proc_task
_logger.info("Starting mjpg_streamer ...")
@@ -60,10 +55,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important for enable
- GPIO.output(self.__cap_power, enabled)
+ gpio.write(self.__cap_power, enabled)
if enabled:
await asyncio.sleep(self.__sync_delay)
- GPIO.output(self.__vga_power, enabled)
+ gpio.write(self.__vga_power, enabled)
await asyncio.sleep(self.__sync_delay)
async def __process(self) -> None: