summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-07-13 17:54:15 +0000
committerDevaev Maxim <[email protected]>2018-07-13 17:54:15 +0000
commite118d270dfb310498d2135f5af7c6b86bdc161c7 (patch)
tree1172c6ef98ab67b6e579cf6a373691f9d91797bc
parent9e1f9ae853a733e0c0178bf1b8e03c6e6db7b2dd (diff)
exclusive region-based access for soma apis
-rw-r--r--kvmd/kvmd/aioregion.py33
-rw-r--r--kvmd/kvmd/atx.py30
-rw-r--r--kvmd/kvmd/msd.py79
-rw-r--r--kvmd/kvmd/server.py80
-rw-r--r--kvmd/web/js/kvmd.js42
5 files changed, 154 insertions, 110 deletions
diff --git a/kvmd/kvmd/aioregion.py b/kvmd/kvmd/aioregion.py
new file mode 100644
index 00000000..0e81f82a
--- /dev/null
+++ b/kvmd/kvmd/aioregion.py
@@ -0,0 +1,33 @@
+import types
+
+from typing import Type
+
+
+# =====
+class RegionIsBusyError(Exception):
+ pass
+
+
+class AioExclusiveRegion:
+ def __init__(self) -> None:
+ self.__busy = False
+
+ def enter(self) -> None:
+ if not self.__busy:
+ self.__busy = True
+ return
+ raise RegionIsBusyError()
+
+ def exit(self) -> None:
+ self.__busy = False
+
+ def __enter__(self) -> None:
+ self.enter()
+
+ def __exit__(
+ self,
+ _exc_type: Type[BaseException],
+ _exc: BaseException,
+ _tb: types.TracebackType,
+ ) -> None:
+ self.exit()
diff --git a/kvmd/kvmd/atx.py b/kvmd/kvmd/atx.py
index 26cd9495..cf155e7e 100644
--- a/kvmd/kvmd/atx.py
+++ b/kvmd/kvmd/atx.py
@@ -4,6 +4,7 @@ from typing import Dict
from .logging import get_logger
+from . import aioregion
from . import gpio
@@ -28,7 +29,7 @@ class Atx:
self.__click_delay = click_delay
self.__long_click_delay = long_click_delay
- self.__lock = asyncio.Lock()
+ self.__region = aioregion.AioExclusiveRegion()
def get_state(self) -> Dict:
return {
@@ -39,22 +40,19 @@ class Atx:
}
async def click_power(self) -> None:
- if (await self.__click(self.__power_switch, self.__click_delay)):
- get_logger().info("Clicked power")
+ await self.__click(self.__power_switch, self.__click_delay)
+ get_logger().info("Clicked power")
async def click_power_long(self) -> None:
- if (await self.__click(self.__power_switch, self.__long_click_delay)):
- get_logger().info("Clicked power (long press)")
+ await self.__click(self.__power_switch, self.__long_click_delay)
+ get_logger().info("Clicked power (long press)")
async def click_reset(self) -> None:
- if (await self.__click(self.__reset_switch, self.__click_delay)):
- get_logger().info("Clicked reset")
-
- async def __click(self, pin: int, delay: float) -> bool:
- if not self.__lock.locked():
- async with self.__lock:
- for flag in (True, False):
- gpio.write(pin, flag)
- await asyncio.sleep(delay)
- return True
- return False
+ await self.__click(self.__reset_switch, self.__click_delay)
+ get_logger().info("Clicked reset")
+
+ async def __click(self, pin: int, delay: float) -> None:
+ with self.__region:
+ for flag in (True, False):
+ gpio.write(pin, flag)
+ await asyncio.sleep(delay)
diff --git a/kvmd/kvmd/msd.py b/kvmd/kvmd/msd.py
index 498c2ff2..1dc658c6 100644
--- a/kvmd/kvmd/msd.py
+++ b/kvmd/kvmd/msd.py
@@ -15,6 +15,8 @@ import pyudev
import aiofiles
import aiofiles.base
+from . import aioregion
+
from .logging import get_logger
@@ -151,14 +153,13 @@ def _explore_device(device_path: str) -> Optional[_MassStorageDeviceInfo]:
)
-def _operated_and_locked(method: Callable) -> Callable:
+def _msd_operated(method: Callable) -> Callable:
async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any:
if self._device_file: # pylint: disable=protected-access
raise IsBusyError()
if not self._device_path: # pylint: disable=protected-access
IsNotOperationalError()
- async with self._lock: # pylint: disable=protected-access
- return (await method(self, *args, **kwargs))
+ return (await method(self, *args, **kwargs))
return wrap
@@ -178,7 +179,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
self.__loop = loop
self.__device_info: Optional[_MassStorageDeviceInfo] = None
- self._lock = asyncio.Lock()
+ self.__region = aioregion.AioExclusiveRegion()
self._device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
@@ -198,23 +199,25 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
logger.warning("Mass-storage device is not operational")
- @_operated_and_locked
+ @_msd_operated
async def connect_to_kvm(self, no_delay: bool=False) -> None:
- if self.__device_info:
- raise AlreadyConnectedToKvmError()
- # TODO: disable gpio
- if not no_delay:
- await asyncio.sleep(self.__init_delay)
- await self.__load_device_info()
- get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
+ with self.__region:
+ if self.__device_info:
+ raise AlreadyConnectedToKvmError()
+ # TODO: disable gpio
+ if not no_delay:
+ await asyncio.sleep(self.__init_delay)
+ await self.__load_device_info()
+ get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
- @_operated_and_locked
+ @_msd_operated
async def connect_to_pc(self) -> None:
- if not self.__device_info:
- raise AlreadyConnectedToPcError()
- # TODO: enable gpio
- self.__device_info = None
- get_logger().info("Mass-storage device switched to Server")
+ with self.__region:
+ if not self.__device_info:
+ raise AlreadyConnectedToPcError()
+ # TODO: enable gpio
+ self.__device_info = None
+ get_logger().info("Mass-storage device switched to Server")
def get_state(self) -> Dict:
info = (self.__device_info._asdict() if self.__device_info else None)
@@ -230,36 +233,34 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
}
async def cleanup(self) -> None:
- async with self._lock:
- await self.__close_device_file()
- # TODO: disable gpio
+ await self.__close_device_file()
+ # TODO: disable gpio
- @_operated_and_locked
+ @_msd_operated
async def __aenter__(self) -> "MassStorageDevice":
if not self.__device_info:
raise IsNotConnectedToKvmError()
self._device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
self.__written = 0
+ self.__region.enter()
return self
async def write_image_info(self, name: str, complete: bool) -> None:
- async with self._lock:
- assert self._device_file
- assert self.__device_info
- if self.__write_meta:
- if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
- await self._device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
- await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
- await self._device_file.seek(0)
- await self.__load_device_info()
- else:
- get_logger().error("Can't write image info because device is full")
+ assert self._device_file
+ assert self.__device_info
+ if self.__write_meta:
+ if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
+ await self._device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
+ await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
+ await self._device_file.seek(0)
+ await self.__load_device_info()
+ else:
+ get_logger().error("Can't write image info because device is full")
async def write_image_chunk(self, chunk: bytes) -> int:
- async with self._lock:
- await self.__write_to_device_file(chunk)
- self.__written += len(chunk)
- return self.__written
+ await self.__write_to_device_file(chunk)
+ self.__written += len(chunk)
+ return self.__written
async def __aexit__(
self,
@@ -267,8 +268,10 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
- async with self._lock:
+ try:
await self.__close_device_file()
+ finally:
+ self.__region.exit()
async def __write_to_device_file(self, data: bytes) -> None:
assert self._device_file
diff --git a/kvmd/kvmd/server.py b/kvmd/kvmd/server.py
index 26ac4b0c..7f7e3a6d 100644
--- a/kvmd/kvmd/server.py
+++ b/kvmd/kvmd/server.py
@@ -9,10 +9,11 @@ from typing import Dict
from typing import Set
from typing import Callable
from typing import Optional
-from typing import Type
import aiohttp.web
+from .aioregion import RegionIsBusyError
+
from .hid import Hid
from .atx import Atx
@@ -38,35 +39,46 @@ def _system_task(method: Callable) -> Callable:
return wrap
-class _BadRequest(Exception):
+def _json(result: Optional[Dict]=None, status: int=200) -> aiohttp.web.Response:
+ return aiohttp.web.json_response({
+ "ok": (True if status == 200 else False),
+ "result": (result or {}),
+ }, status=status)
+
+
+def _json_exception(msg: str, err: Exception, status: int) -> aiohttp.web.Response:
+ get_logger().error("%s: %s", msg, err)
+ return _json({
+ "error": type(err).__name__,
+ "error_msg": str(err),
+ }, status=status)
+
+
+class BadRequest(Exception):
pass
-def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable:
+class PerformingAnotherOperation(Exception):
+ def __init__(self) -> None:
+ super().__init__("Performing another operation, please try again later")
+
+
+def _wrap_exceptions_for_web(msg: str) -> Callable:
def make_wrapper(method: Callable) -> Callable:
async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
- return (await method(self, request))
- except tuple(exceptions) as err: # pylint: disable=catching-non-exception
- get_logger().error(msg)
- return aiohttp.web.json_response({
- "ok": False,
- "result": {
- "error": type(err).__name__,
- "error_msg": str(err),
- },
- }, status=400)
+ try:
+ return (await method(self, request))
+ except RegionIsBusyError:
+ raise PerformingAnotherOperation()
+ except (BadRequest, MassStorageOperationError) as err:
+ return _json_exception(msg, err, 400)
+ except PerformingAnotherOperation as err:
+ return _json_exception(msg, err, 409)
return wrap
return make_wrapper
-def _json_200(result: Optional[Dict]=None) -> aiohttp.web.Response:
- return aiohttp.web.json_response({
- "ok": True,
- "result": (result or {}),
- })
-
-
class Server: # pylint: disable=too-many-instance-attributes
def __init__(
self,
@@ -159,9 +171,9 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json_200(self.__atx.get_state())
+ return _json(self.__atx.get_state())
- @_exceptions_as_400("Click error", [_BadRequest])
+ @_wrap_exceptions_for_web("Click error")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = request.query.get("button")
if button == "power":
@@ -171,13 +183,13 @@ class Server: # pylint: disable=too-many-instance-attributes
elif button == "reset":
await self.__atx.click_reset()
else:
- raise _BadRequest("Missing or invalid 'button=%s'" % (button))
- return _json_200({"clicked": button})
+ raise BadRequest("Missing or invalid 'button=%s'" % (button))
+ return _json({"clicked": button})
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json_200(self.__msd.get_state())
+ return _json(self.__msd.get_state())
- @_exceptions_as_400("Mass-storage error", [MassStorageOperationError, _BadRequest])
+ @_wrap_exceptions_for_web("Mass-storage error")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = request.query.get("to")
if to == "kvm":
@@ -187,10 +199,10 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__msd.connect_to_pc()
await self.__broadcast_event("msd_state", state="connected_to_server") # type: ignore
else:
- raise _BadRequest("Missing or invalid 'to=%s'" % (to))
- return _json_200(self.__msd.get_state())
+ raise BadRequest("Missing or invalid 'to=%s'" % (to))
+ return _json(self.__msd.get_state())
- @_exceptions_as_400("Can't write data to mass-storage device", [MassStorageOperationError, _BadRequest])
+ @_wrap_exceptions_for_web("Can't write data to mass-storage device")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0)
reader = await request.multipart()
@@ -198,12 +210,12 @@ class Server: # pylint: disable=too-many-instance-attributes
try:
field = await reader.next()
if not field or field.name != "image_name":
- raise _BadRequest("Missing 'image_name' field")
+ raise BadRequest("Missing 'image_name' field")
image_name = (await field.read()).decode("utf-8")[:256]
field = await reader.next()
if not field or field.name != "image_data":
- raise _BadRequest("Missing 'image_data' field")
+ raise BadRequest("Missing 'image_data' field")
async with self.__msd:
await self.__broadcast_event("msd_state", state="busy") # type: ignore
@@ -219,14 +231,14 @@ class Server: # pylint: disable=too-many-instance-attributes
finally:
if written != 0:
logger.info("written %d bytes to mass-storage device", written)
- return _json_200({"written": written})
+ return _json({"written": written})
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json_200(self.__streamer.get_state())
+ return _json(self.__streamer.get_state())
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
- return _json_200()
+ return _json()
def __run_app_print(self, text: str) -> None:
logger = get_logger()
diff --git a/kvmd/web/js/kvmd.js b/kvmd/web/js/kvmd.js
index f9a789f3..a4d75387 100644
--- a/kvmd/web/js/kvmd.js
+++ b/kvmd/web/js/kvmd.js
@@ -154,17 +154,16 @@ function clickAtxButton(el_button) {
if (button && confirm(confirm_msg)) {
__setAtxButtonsBusy(true);
- var http = new XMLHttpRequest();
- http.open("POST", "/kvmd/atx/click?button=" + button, true);
- http.onreadystatechange = function() {
+ var http = __request("POST", "/kvmd/atx/click?button=" + button, function() {
if (http.readyState == 4) {
- if (http.status != 200) {
+ if (http.status == 409) {
+ alert("Performing another ATX operation for other client, please try again later");
+ } else if (http.status != 200) {
alert("Click error: " + http.responseText);
}
- __setAtxButtonsBusy(false);
+ __setAtxButtonsBusy(false);
}
- }
- http.send();
+ });
}
}
@@ -181,9 +180,7 @@ function __setAtxButtonsBusy(busy) {
// -----------------------------------------------------------------------------
function pollStreamer() {
- var http = new XMLHttpRequest();
- http.open("GET", "/streamer/?action=snapshot", true);
- http.onreadystatechange = function() {
+ var http = __request("GET", "/streamer/?action=snapshot", function() {
if (http.readyState == 2 || http.readyState == 4) {
var status = http.status;
http.onreadystatechange = null;
@@ -198,16 +195,13 @@ function pollStreamer() {
pollStreamer.last = true;
}
}
- }
- http.send();
+ });
setTimeout(pollStreamer, 2000);
}
pollStreamer.last = false;
function __refreshStreamer() {
- var http = new XMLHttpRequest();
- http.open("GET", "/kvmd/streamer", true);
- http.onreadystatechange = function() {
+ var http = __request("GET", "/kvmd/streamer", function() {
if (http.readyState == 4 && http.status == 200) {
size = JSON.parse(http.responseText).result.size;
el_stream_box = document.getElementById("stream-image");
@@ -215,27 +209,31 @@ function __refreshStreamer() {
el_stream_box.style.height = size.height + "px";
document.getElementById("stream-image").src = "/streamer/?action=stream&time=" + new Date().getTime();
}
- }
- http.send();
+ });
}
function clickResetStreamerButton(el_button) {
__setButtonBusy(el_button, true);
- var http = new XMLHttpRequest();
- http.open("POST", "/kvmd/streamer/reset", true);
- http.onreadystatechange = function() {
+ var http = __request("POST", "/kvmd/streamer/reset", function() {
if (http.readyState == 4) {
if (http.status != 200) {
alert("Can't reset streamer: " + http.responseText);
}
__setButtonBusy(el_button, false);
}
- }
- http.send();
+ });
}
// -----------------------------------------------------------------------------
+function __request(method, url, callback) {
+ var http = new XMLHttpRequest();
+ http.open(method, url, true)
+ http.onreadystatechange = callback;
+ http.send();
+ return http;
+}
+
function __setButtonBusy(el_button, busy) {
el_button.disabled = busy;
el_button.style.cursor = (busy ? "wait" : "default");