summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/apps/__init__.py92
-rw-r--r--kvmd/apps/cleanup/__init__.py25
-rw-r--r--kvmd/apps/kvmd/__init__.py3
-rw-r--r--kvmd/apps/kvmd/api/auth.py30
-rw-r--r--kvmd/apps/kvmd/api/ugpio.py63
-rw-r--r--kvmd/apps/kvmd/server.py7
-rw-r--r--kvmd/apps/kvmd/ugpio.py230
-rw-r--r--kvmd/gpio.py35
-rw-r--r--kvmd/validators/hw.py9
-rw-r--r--web/kvm/index.html1
-rw-r--r--web/kvm/window-about.pug1
11 files changed, 449 insertions, 47 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index f5248f00..138e4ecf 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -79,7 +79,9 @@ from ..validators.kvm import valid_stream_resolution
from ..validators.kvm import valid_hid_key
from ..validators.kvm import valid_hid_mouse_move
+from ..validators.hw import valid_gpio_pin
from ..validators.hw import valid_gpio_pin_optional
+from ..validators.hw import valid_gpio_mode
from ..validators.hw import valid_otg_gadget
from ..validators.hw import valid_otg_id
@@ -113,6 +115,7 @@ def init(
load_hid=True,
load_atx=True,
load_msd=True,
+ load_gpio=True,
))
raise SystemExit()
config = _init_config(options.config_path, options.set_options, **load)
@@ -123,15 +126,7 @@ def init(
# =====
-def _init_config(
- config_path: str,
- override_options: List[str],
- load_auth: bool=False,
- load_hid: bool=False,
- load_atx: bool=False,
- load_msd: bool=False,
-) -> Section:
-
+def _init_config(config_path: str, override_options: List[str], **load_flags: bool) -> Section:
config_path = os.path.expanduser(config_path)
raw_config: Dict = load_yaml_file(config_path)
@@ -141,24 +136,7 @@ def _init_config(
_merge_dicts(raw_config, build_raw_from_options(override_options))
config = make_config(raw_config, scheme)
- rebuild = False
-
- if load_auth:
- scheme["kvmd"]["auth"]["internal"].update(get_auth_service_class(config.kvmd.auth.internal.type).get_plugin_options())
- if config.kvmd.auth.external.type:
- scheme["kvmd"]["auth"]["external"].update(get_auth_service_class(config.kvmd.auth.external.type).get_plugin_options())
- rebuild = True
-
- for (load, section, get_class) in [
- (load_hid, "hid", get_hid_class),
- (load_atx, "atx", get_atx_class),
- (load_msd, "msd", get_msd_class),
- ]:
- if load:
- scheme["kvmd"][section].update(get_class(getattr(config.kvmd, section).type).get_plugin_options())
- rebuild = True
-
- if rebuild:
+ if _patch_dynamic(raw_config, config, scheme, **load_flags):
config = make_config(raw_config, scheme)
return config
@@ -166,6 +144,61 @@ def _init_config(
raise SystemExit(f"Config error: {err}")
+def _patch_dynamic( # pylint: disable=too-many-locals
+ raw_config: Dict,
+ config: Section,
+ scheme: Dict,
+ load_auth: bool=False,
+ load_hid: bool=False,
+ load_atx: bool=False,
+ load_msd: bool=False,
+ load_gpio: bool=False,
+) -> bool:
+
+ rebuild = False
+
+ if load_auth:
+ scheme["kvmd"]["auth"]["internal"].update(get_auth_service_class(config.kvmd.auth.internal.type).get_plugin_options())
+ if config.kvmd.auth.external.type:
+ scheme["kvmd"]["auth"]["external"].update(get_auth_service_class(config.kvmd.auth.external.type).get_plugin_options())
+ rebuild = True
+
+ for (load, section, get_class) in [
+ (load_hid, "hid", get_hid_class),
+ (load_atx, "atx", get_atx_class),
+ (load_msd, "msd", get_msd_class),
+ ]:
+ if load:
+ scheme["kvmd"][section].update(get_class(getattr(config.kvmd, section).type).get_plugin_options())
+ rebuild = True
+
+ if load_gpio:
+ for (channel, params) in raw_config.get("kvmd", {}).get("gpio", {}).get("scheme", {}).items():
+ try:
+ mode = valid_gpio_mode(params.get("mode", ""))
+ except Exception:
+ mode = ""
+ channel_scheme: Dict = {
+ "pin": Option(-1, type=valid_gpio_pin),
+ "mode": Option("", type=valid_gpio_mode),
+ "title": Option(""),
+ }
+ if mode == "input":
+ channel_scheme["inverted"] = Option(False, type=valid_bool)
+ else: # output
+ channel_scheme.update({
+ "switch": Option(True, type=valid_bool),
+ "pulse": {
+ "delay": Option(0.1, type=valid_float_f0),
+ "min_delay": Option(0.1, type=valid_float_f01),
+ "max_delay": Option(0.1, type=valid_float_f01),
+ },
+ })
+ scheme["kvmd"]["gpio"]["scheme"][channel] = channel_scheme
+
+ return rebuild
+
+
def _dump_config(config: Section) -> None:
dump = make_config_dump(config)
if sys.stdout.isatty():
@@ -288,6 +321,11 @@ def _get_config_scheme() -> Dict:
"retries": Option(10, type=valid_int_f1),
"retries_delay": Option(3.0, type=valid_float_f01),
},
+
+ "gpio": {
+ "state_poll": Option(0.1, type=valid_float_f01),
+ "scheme": {}, # Dymanic content
+ },
},
"otg": {
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
index 57823ff1..ca1e8faf 100644
--- a/kvmd/apps/cleanup/__init__.py
+++ b/kvmd/apps/cleanup/__init__.py
@@ -45,24 +45,30 @@ def _clear_gpio(config: Section) -> None:
with gpio.bcm():
for (name, pin) in [
*([
- ("tty_hid_reset_pin", config.hid.reset_pin),
- ] if config.hid.type == "tty" else []),
+ ("hid_serial/reset", config.hid.reset_pin),
+ ] if config.hid.type == "serial" else []),
*([
- ("gpio_atx_power_switch_pin", config.atx.power_switch_pin),
- ("gpio_atx_reset_switch_pin", config.atx.reset_switch_pin),
+ ("atx_gpio/power_switch", config.atx.power_switch_pin),
+ ("atx_gpio/reset_switch", config.atx.reset_switch_pin),
] if config.atx.type == "gpio" else []),
*([
- ("relay_msd_target_pin", config.msd.target_pin),
- ("relay_msd_reset_pin", config.msd.reset_pin),
+ ("msd_relay/target", config.msd.target_pin),
+ ("msd_relay/reset", config.msd.reset_pin),
] if config.msd.type == "relay" else []),
- ("streamer_cap_pin", config.streamer.cap_pin),
- ("streamer_conv_pin", config.streamer.conv_pin),
+ ("streamer/cap", config.streamer.cap_pin),
+ ("streamer/conv", config.streamer.conv_pin),
+
+ *([
+ (f"gpio/{channel}", params.pin)
+ for (channel, params) in config.gpio.scheme.items()
+ if params.mode == "output"
+ ]),
]:
if pin >= 0:
- logger.info("Writing value=0 to GPIO pin=%d (%s)", pin, name)
+ logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
try:
gpio.set_output(pin, initial=False)
except Exception:
@@ -114,6 +120,7 @@ def main(argv: Optional[List[str]]=None) -> None:
load_hid=True,
load_atx=True,
load_msd=True,
+ load_gpio=True,
)[2].kvmd
logger = get_logger(0)
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 1684339f..e257145d 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -37,6 +37,7 @@ from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
+from .ugpio import UserGpio
from .streamer import Streamer
from .snapshoter import Snapshoter
from .server import KvmdServer
@@ -52,6 +53,7 @@ def main(argv: Optional[List[str]]=None) -> None:
load_hid=True,
load_atx=True,
load_msd=True,
+ load_gpio=True,
)[2]
with gpio.bcm():
@@ -79,6 +81,7 @@ def main(argv: Optional[List[str]]=None) -> None:
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
+ user_gpio=UserGpio(config.gpio),
hid=hid,
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
diff --git a/kvmd/apps/kvmd/api/auth.py b/kvmd/apps/kvmd/api/auth.py
index 80996e7e..cdbeb0a6 100644
--- a/kvmd/apps/kvmd/api/auth.py
+++ b/kvmd/apps/kvmd/api/auth.py
@@ -20,6 +20,8 @@
# ========================================================================== #
+import base64
+
from aiohttp.web import Request
from aiohttp.web import Response
@@ -43,25 +45,35 @@ _COOKIE_AUTH_TOKEN = "auth_token"
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
if exposed.auth_required and auth_manager.is_auth_enabled():
- user = request.headers.get("X-KVMD-User", "")
- passwd = request.headers.get("X-KVMD-Passwd", "")
- token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
-
- if user:
+ if (user := request.headers.get("X-KVMD-User", "")):
user = valid_user(user)
+ passwd = request.headers.get("X-KVMD-Passwd", "")
set_request_auth_info(request, f"{user} (xhdr)")
if not (await auth_manager.authorize(user, valid_passwd(passwd))):
raise ForbiddenError()
+ return
- elif token:
+ elif (token := request.cookies.get(_COOKIE_AUTH_TOKEN, "")):
user = auth_manager.check(valid_auth_token(token))
if not user:
set_request_auth_info(request, "- (token)")
raise ForbiddenError()
set_request_auth_info(request, f"{user} (token)")
-
- else:
- raise UnauthorizedError()
+ return
+
+ elif (basic_auth := request.headers.get("Authorization", "")):
+ if basic_auth[:6].lower() == "basic ":
+ try:
+ (user, passwd) = base64.b64decode(basic_auth[6:]).decode("utf-8").split(":")
+ except Exception:
+ raise UnauthorizedError()
+ user = valid_user(user)
+ set_request_auth_info(request, f"{user} (basic)")
+ if not (await auth_manager.authorize(user, valid_passwd(passwd))):
+ raise ForbiddenError()
+ return
+
+ raise UnauthorizedError()
class AuthApi:
diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py
new file mode 100644
index 00000000..c4852593
--- /dev/null
+++ b/kvmd/apps/kvmd/api/ugpio.py
@@ -0,0 +1,63 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from aiohttp.web import Request
+from aiohttp.web import Response
+
+from ....validators.basic import valid_bool
+from ....validators.basic import valid_float_f0
+
+from ....validators.hw import valid_gpio_channel
+
+from ..ugpio import UserGpio
+
+from ..http import exposed_http
+from ..http import make_json_response
+
+
+# =====
+class UserGpioApi:
+ def __init__(self, user_gpio: UserGpio) -> None:
+ self.__user_gpio = user_gpio
+
+ # =====
+
+ @exposed_http("GET", "/gpio")
+ async def __state_handler(self, _: Request) -> Response:
+ return make_json_response({
+ "scheme": (await self.__user_gpio.get_scheme()),
+ "state": (await self.__user_gpio.get_state()),
+ })
+
+ @exposed_http("POST", "/gpio/switch")
+ async def __switch_handler(self, request: Request) -> Response:
+ channel = valid_gpio_channel(request.query.get("channel"))
+ state = valid_bool(request.query.get("state"))
+ done = await self.__user_gpio.switch(channel, state)
+ return make_json_response({"done": done})
+
+ @exposed_http("POST", "/gpio/pulse")
+ async def __pulse_handler(self, request: Request) -> Response:
+ channel = valid_gpio_channel(request.query.get("channel"))
+ delay = valid_float_f0(request.query.get("delay", "0"))
+ await self.__user_gpio.pulse(channel, delay)
+ return make_json_response()
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 14c975f0..52981568 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -65,6 +65,7 @@ from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
+from .ugpio import UserGpio
from .streamer import Streamer
from .snapshoter import Snapshoter
@@ -84,6 +85,7 @@ from .api.auth import check_request_auth
from .api.info import InfoApi
from .api.log import LogApi
from .api.wol import WolApi
+from .api.ugpio import UserGpioApi
from .api.hid import HidApi
from .api.atx import AtxApi
from .api.msd import MsdApi
@@ -137,6 +139,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
info_manager: InfoManager,
log_reader: LogReader,
wol: WakeOnLan,
+ user_gpio: UserGpio,
hid: BaseHid,
atx: BaseAtx,
@@ -154,6 +157,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__hid = hid
self.__streamer = streamer
self.__snapshoter = snapshoter # Not a component: No state or cleanup
+ self.__user_gpio = user_gpio # Has extra state "gpio_scheme_state"
self.__heartbeat = heartbeat
@@ -167,6 +171,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
],
*[
_Component("Wake-on-LAN", "wol_state", wol),
+ _Component("User-GPIO", "gpio_state", user_gpio),
_Component("HID", "hid_state", hid),
_Component("ATX", "atx_state", atx),
_Component("MSD", "msd_state", msd),
@@ -180,6 +185,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
InfoApi(info_manager),
LogApi(log_reader),
WolApi(wol),
+ UserGpioApi(user_gpio),
HidApi(hid, keymap_path),
AtxApi(atx),
MsdApi(msd, sync_chunk_size),
@@ -235,6 +241,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
await client.ws.prepare(request)
await self.__register_ws_client(client)
try:
+ await self.__broadcast_event("gpio_scheme_state", await self.__user_gpio.get_scheme())
await asyncio.gather(*[
self.__broadcast_event(component.event_type, await component.get_state())
for component in self.__components
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
new file mode 100644
index 00000000..d4d992c3
--- /dev/null
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -0,0 +1,230 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import operator
+
+from typing import Dict
+from typing import AsyncGenerator
+from typing import Optional
+
+from ...logging import get_logger
+
+from ... import aiotools
+from ... import gpio
+
+from ...yamlconf import Section
+
+from ...errors import OperationError
+from ...errors import IsBusyError
+
+
+# =====
+class GpioChannelNotFoundError(OperationError):
+ def __init__(self) -> None:
+ super().__init__("GPIO channel is not found")
+
+
+class GpioSwitchNotSupported(OperationError):
+ def __init__(self) -> None:
+ super().__init__("This GPIO channel does not support switching")
+
+
+class GpioPulseNotSupported(OperationError):
+ def __init__(self) -> None:
+ super().__init__("This GPIO channel does not support pulsing")
+
+
+class GpioChannelIsBusyError(IsBusyError):
+ def __init__(self) -> None:
+ super().__init__("Performing another GPIO operation on this channel, please try again later")
+
+
+# =====
+class _GpioInput:
+ def __init__(self, channel: str, config: Section, reader: gpio.BatchReader) -> None:
+ self.__channel = channel
+ self.__title: str = config.title
+ self.__pin: int = gpio.set_input(config.pin)
+ self.__inverted: bool = config.inverted
+ self.__reader = reader
+
+ def get_scheme(self) -> Dict:
+ return {"title": self.__title}
+
+ def get_state(self) -> Dict:
+ return {"state": (self.__reader.get(self.__pin) ^ self.__inverted)}
+
+ def __str__(self) -> str:
+ return f"Input({self.__channel}, pin={self.__pin}, inverted={self.__inverted})"
+
+ __repr__ = __str__
+
+
+class _GpioOutput: # pylint: disable=too-many-instance-attributes
+ def __init__(self, channel: str, config: Section, notifier: aiotools.AioNotifier) -> None:
+ self.__channel = channel
+ self.__title: str = config.title
+ self.__pin: int = gpio.set_output(config.pin)
+ self.__switch: bool = config.switch
+ self.__pulse_delay: float = config.pulse.delay
+ self.__min_pulse_delay: float = config.pulse.min_delay
+ self.__max_pulse_delay: float = config.pulse.max_delay
+
+ self.__state = False
+ self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier)
+
+ def get_scheme(self) -> Dict:
+ return {
+ "title": self.__title,
+ "switch": self.__switch,
+ "pulse": {
+ "delay": self.__pulse_delay,
+ "min_delay": self.__min_pulse_delay,
+ "max_delay": self.__max_pulse_delay,
+ },
+ }
+
+ def get_state(self) -> Dict:
+ busy = self.__region.is_busy()
+ return {
+ "state": (self.__state if not busy else False),
+ "busy": busy,
+ }
+
+ def cleanup(self) -> None:
+ try:
+ gpio.write(self.__pin, False)
+ except Exception:
+ get_logger().exception("Can't cleanup GPIO %s", self)
+
+ async def switch(self, state: bool) -> bool:
+ if not self.__switch:
+ raise GpioSwitchNotSupported()
+ async with self.__region:
+ # Состояние проверяется только при изменении
+ real_state = gpio.read(self.__pin)
+ if state != real_state:
+ gpio.write(self.__pin, state)
+ self.__state = state
+ get_logger(0).info("Switched GPIO %s to %d", self, state)
+ return True
+ self.__state = real_state
+ return False
+
+ @aiotools.atomic
+ async def pulse(self, delay: float) -> None:
+ if not self.__pulse_delay:
+ raise GpioPulseNotSupported()
+ delay = min(max((delay or self.__pulse_delay), self.__min_pulse_delay), self.__max_pulse_delay)
+ await aiotools.run_region_task(
+ f"Can't perform GPIO pulse of {self} or operation was not completed",
+ self.__region, self.__inner_pulse, delay,
+ )
+
+ @aiotools.atomic
+ async def __inner_pulse(self, delay: float) -> None:
+ try:
+ gpio.write(self.__pin, True)
+ await asyncio.sleep(delay)
+ finally:
+ gpio.write(self.__pin, False)
+ await asyncio.sleep(1)
+ get_logger(0).info("Pulsed GPIO %s", self)
+
+ def __str__(self) -> str:
+ return f"Output({self.__channel}, pin={self.__pin}, switch={self.__switch}, pulse={bool(self.__max_pulse_delay)})"
+
+ __repr__ = __str__
+
+
+# =====
+class UserGpio:
+ def __init__(self, config: Section) -> None:
+ self.__state_notifier = aiotools.AioNotifier()
+ self.__reader = gpio.BatchReader(
+ pins=[ch_config.pin for ch_config in config.scheme.values()],
+ interval=config.state_poll,
+ notifier=self.__state_notifier,
+ )
+
+ self.__inputs: Dict[str, _GpioInput] = {}
+ self.__outputs: Dict[str, _GpioOutput] = {}
+
+ for (channel, ch_config) in sorted(config.scheme.items(), key=operator.itemgetter(0)):
+ if ch_config.mode == "input":
+ self.__inputs[channel] = _GpioInput(channel, ch_config, self.__reader)
+ else: # output:
+ self.__outputs[channel] = _GpioOutput(channel, ch_config, self.__state_notifier)
+
+ async def get_scheme(self) -> Dict:
+ return {
+ "inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
+ "outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()},
+ }
+
+ async def get_state(self) -> Dict:
+ return {
+ "inputs": {channel: gin.get_state() for (channel, gin) in self.__inputs.items()},
+ "outputs": {channel: gout.get_state() for (channel, gout) in self.__outputs.items()},
+ }
+
+ async def poll_state(self) -> AsyncGenerator[Dict, None]:
+ reader_task = asyncio.create_task(self.__reader.poll())
+ waiter_task: Optional[asyncio.Task] = None
+ prev_state: Dict = {}
+ try:
+ while True:
+ if reader_task.cancelled():
+ break
+ if reader_task.done():
+ RuntimeError("BatchReader task is dead")
+
+ state = await self.get_state()
+ if state != prev_state:
+ yield state
+ prev_state = state
+
+ if waiter_task is None:
+ waiter_task = asyncio.create_task(self.__state_notifier.wait())
+ if waiter_task in (await aiotools.wait_first(reader_task, waiter_task))[0]:
+ waiter_task = None
+ finally:
+ if not reader_task.done():
+ reader_task.cancel()
+ await reader_task
+
+ async def cleanup(self) -> None:
+ for gout in self.__outputs.values():
+ gout.cleanup()
+
+ async def switch(self, channel: str, state: bool) -> bool:
+ gout = self.__outputs.get(channel)
+ if gout is None:
+ raise GpioChannelNotFoundError()
+ return (await gout.switch(state))
+
+ async def pulse(self, channel: str, delay: float) -> None:
+ gout = self.__outputs.get(channel)
+ if gout is None:
+ raise GpioChannelNotFoundError()
+ await gout.pulse(delay)
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
index ef9d1c5e..6959a640 100644
--- a/kvmd/gpio.py
+++ b/kvmd/gpio.py
@@ -20,14 +20,20 @@
# ========================================================================== #
+import asyncio
import contextlib
+from typing import Tuple
+from typing import List
from typing import Generator
+from typing import Optional
from RPi import GPIO
from .logging import get_logger
+from . import aiotools
+
# =====
@contextlib.contextmanager
@@ -59,6 +65,31 @@ def read(pin: int) -> bool:
return bool(GPIO.input(pin))
-def write(pin: int, flag: bool) -> None:
+def write(pin: int, state: bool) -> None:
assert pin >= 0, pin
- GPIO.output(pin, flag)
+ GPIO.output(pin, state)
+
+
+class BatchReader:
+ def __init__(self, pins: List[int], interval: float, notifier: aiotools.AioNotifier) -> None:
+ self.__pins = pins
+ self.__flags: Tuple[Optional[bool], ...] = (None,) * len(pins)
+ self.__state = dict.fromkeys(pins, False)
+
+ self.__interval = interval
+ self.__notifier = notifier
+
+ def get(self, pin: int) -> bool:
+ return self.__state[pin]
+
+ async def poll(self) -> None:
+ if not self.__pins:
+ await aiotools.wait_infinite()
+ else:
+ while True:
+ flags = tuple(map(read, self.__pins))
+ if flags != self.__flags:
+ self.__flags = flags
+ self.__state = dict(zip(self.__pins, flags))
+ await self.__notifier.notify()
+ await asyncio.sleep(self.__interval)
diff --git a/kvmd/validators/hw.py b/kvmd/validators/hw.py
index 82d1221a..e37eeda8 100644
--- a/kvmd/validators/hw.py
+++ b/kvmd/validators/hw.py
@@ -23,6 +23,7 @@
from typing import Any
from . import check_in_list
+from . import check_string_in_list
from . import check_re_match
from .basic import valid_number
@@ -43,6 +44,14 @@ def valid_gpio_pin_optional(arg: Any) -> int:
return int(valid_number(arg, min=-1, name="optional GPIO pin"))
+def valid_gpio_mode(arg: Any) -> str:
+ return check_string_in_list(arg, "GPIO mode", ["input", "output"])
+
+
+def valid_gpio_channel(arg: Any) -> str:
+ return check_re_match(arg, "GPIO channel", r"^[a-zA-Z_][a-zA-Z0-9_-]*$")[:255]
+
+
def valid_otg_gadget(arg: Any) -> str:
return check_re_match(arg, "OTG gadget name", r"^[a-z_][a-z0-9_-]*$")[:255]
diff --git a/web/kvm/index.html b/web/kvm/index.html
index 1f5860a2..739b8ca5 100644
--- a/web/kvm/index.html
+++ b/web/kvm/index.html
@@ -1373,6 +1373,7 @@
<li>Scott</li>
<li>Scott Spicola</li>
<li>Sergey Lukjanov</li>
+ <li>Steve Ovens</li>
<li>Steven Richter</li>
<li>Truman Kilen</li>
<li>Walter_Ego</li>
diff --git a/web/kvm/window-about.pug b/web/kvm/window-about.pug
index de1f4ff9..539c854e 100644
--- a/web/kvm/window-about.pug
+++ b/web/kvm/window-about.pug
@@ -88,6 +88,7 @@ mixin about_tab(name, title, checked=false)
li Scott
li Scott Spicola
li Sergey Lukjanov
+ li Steve Ovens
li Steven Richter
li Truman Kilen
li Walter_Ego