summaryrefslogtreecommitdiff
path: root/kvmd/gpio.py
blob: 18b36dfd497dd5760f12356eca6e968825d4f1d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# ========================================================================== #
#                                                                            #
#    KVMD - The main Pi-KVM daemon.                                          #
#                                                                            #
#    Copyright (C) 2018  Maxim Devaev <[email protected]>                    #
#                                                                            #
#    This program is free software: you can redistribute it and/or modify    #
#    it under the terms of the GNU General Public License as published by    #
#    the Free Software Foundation, either version 3 of the License, or       #
#    (at your option) any later version.                                     #
#                                                                            #
#    This program is distributed in the hope that it will be useful,         #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of          #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
#    GNU General Public License for more details.                            #
#                                                                            #
#    You should have received a copy of the GNU General Public License       #
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.  #
#                                                                            #
# ========================================================================== #


import asyncio
import contextlib

from typing import List
from typing import Tuple
from typing import Set
from typing import Generator
from typing import Optional

from RPi import GPIO

from .logging import get_logger

from . import aiotools


# =====
@contextlib.contextmanager
def bcm() -> Generator[None, None, None]:
    logger = get_logger(2)
    GPIO.setmode(GPIO.BCM)
    logger.info("Configured GPIO mode as BCM")
    try:
        yield
    finally:
        GPIO.cleanup()
        logger.info("GPIO cleaned")


def set_output(pin: int, initial: Optional[bool]) -> int:
    assert pin >= 0, pin
    GPIO.setup(pin, GPIO.OUT, initial=initial)
    return pin


def set_input(pin: int) -> int:
    assert pin >= 0, pin
    GPIO.setup(pin, GPIO.IN)
    return pin


def read(pin: int) -> bool:
    assert pin >= 0, pin
    return bool(GPIO.input(pin))


def write(pin: int, state: bool) -> None:
    assert pin >= 0, pin
    GPIO.output(pin, state)


class BatchReader:
    def __init__(
        self,
        pins: Set[int],
        edge_detection: bool,
        interval: float,
        notifier: aiotools.AioNotifier,
    ) -> None:

        self.__pins = sorted(pins)
        self.__edge_detection = edge_detection
        self.__interval = interval
        self.__notifier = notifier

        self.__state = {pin: read(pin) for pin in self.__pins}

        self.__loop: Optional[asyncio.AbstractEventLoop] = None  # Only for edge detection

        self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)  # Only for busyloop

    def get(self, pin: int) -> bool:
        return self.__state[pin]

    async def poll(self) -> None:
        if self.__edge_detection:
            await self.__poll_edge()
        else:
            await self.__poll_busyloop()

    # =====

    async def __poll_edge(self) -> None:
        assert self.__loop is None
        self.__loop = asyncio.get_running_loop()
        watched: List[int] = []
        try:
            for pin in self.__pins:
                GPIO.add_event_detect(
                    pin, GPIO.BOTH,
                    callback=self.__poll_edge_callback,
                    bouncetime=int(self.__interval * 1000),
                )
                watched.append(pin)
            await self.__notifier.notify()
            await aiotools.wait_infinite()
        finally:
            for pin in watched:
                GPIO.remove_event_detect(pin)

    def __poll_edge_callback(self, pin: int) -> None:
        assert self.__loop
        self.__state[pin] = read(pin)
        self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)

    # =====

    async def __poll_busyloop(self) -> None:
        if not self.__pins:
            await aiotools.wait_infinite()
        else:
            while True:
                flags = tuple(map(read, self.__pins))
                if flags != self.__flags:
                    self.__flags = flags
                    self.__state = dict(zip(self.__pins, flags))
                    await self.__notifier.notify()
                await asyncio.sleep(self.__interval)