diff options
author | Devaev Maxim <[email protected]> | 2019-10-29 02:16:12 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-11-07 01:27:07 +0300 |
commit | 10f8c2b3352c951549cc1249d6b24789fb94d688 (patch) | |
tree | cbf38bcf2716896db748f64bcae66375b7135287 /kvmd/inotify.py | |
parent | f6214191af093560d5697cd7b1ea6f245ee95b98 (diff) |
otg msd and big refactoring
Diffstat (limited to 'kvmd/inotify.py')
-rw-r--r-- | kvmd/inotify.py | 335 |
1 files changed, 335 insertions, 0 deletions
diff --git a/kvmd/inotify.py b/kvmd/inotify.py new file mode 100644 index 00000000..cdded981 --- /dev/null +++ b/kvmd/inotify.py @@ -0,0 +1,335 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This source file is partially based on python-watchdog module. # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import sys +import os +import asyncio +import asyncio.queues +import ctypes +import ctypes.util +import struct +import dataclasses +import types +import errno + +from ctypes import c_int +from ctypes import c_char_p +from ctypes import c_uint32 + +from typing import Tuple +from typing import List +from typing import Dict +from typing import Type +from typing import Generator +from typing import Optional + +from .logging import get_logger + + +# ===== +def _load_libc() -> ctypes.CDLL: + try: + path = ctypes.util.find_library("c") + except (OSError, IOError, RuntimeError): + pass + else: + if path: + return ctypes.CDLL(path) + + names = ["libc.so", "libc.so.6", "libc.so.0"] + for (index, name) in enumerate(names): + try: + return ctypes.CDLL(name) + except (OSError, IOError): + if index == len(names) - 1: + raise + + raise RuntimeError("Where is libc?") + + +_libc = _load_libc() + + +def _get_libc_func(name: str, restype, argtypes=None): # type: ignore + return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc)) + + +_inotify_init = _get_libc_func("inotify_init", c_int) +_inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]) +_inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32]) + + +# ===== +_EVENT_HEAD_FMT = "iIII" +_EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT) +_EVENTS_BUFFER_LENGTH = 4096 * (_EVENT_HEAD_SIZE + 256) # count * (head + max_file_name_size + null_character) + +_FS_FALLBACK_ENCODING = "utf-8" +_FS_ENCODING = (sys.getfilesystemencoding() or _FS_FALLBACK_ENCODING) + + +# ===== +def _inotify_parsed_buffer(data: bytes) -> Generator[Tuple[int, int, int, bytes], None, None]: + offset = 0 + while offset + _EVENT_HEAD_SIZE <= len(data): + (wd, mask, cookie, length) = struct.unpack_from("iIII", data, offset) + name = data[ + offset + _EVENT_HEAD_SIZE # noqa: E203 + : + offset + _EVENT_HEAD_SIZE + length + ].rstrip(b"\0") + offset += _EVENT_HEAD_SIZE + length + if wd >= 0: + yield (wd, mask, cookie, name) + + +def _inotify_check(retval: int) -> int: + if retval < 0: + c_errno = ctypes.get_errno() + if c_errno == errno.ENOSPC: # pylint: disable=no-else-raise + raise OSError(c_errno, "Inotify watch limit reached") + elif c_errno == errno.EMFILE: + raise OSError(c_errno, "Inotify instance limit reached") + else: + raise OSError(c_errno, os.strerror(c_errno)) + return retval + + +def _fs_encode(path: str) -> bytes: + try: + return path.encode(_FS_ENCODING, "strict") + except UnicodeEncodeError: + return path.encode(_FS_FALLBACK_ENCODING, "strict") + + +def _fs_decode(path: bytes) -> str: + try: + return path.decode(_FS_ENCODING, "strict") + except UnicodeDecodeError: + return path.decode(_FS_FALLBACK_ENCODING, "strict") + + +# ===== +class InotifyMask: + # Userspace events + ACCESS = 0x00000001 # File was accessed + ATTRIB = 0x00000004 # Meta-data changed + CLOSE_WRITE = 0x00000008 # Writable file was closed + CLOSE_NOWRITE = 0x00000010 # Unwritable file closed + CREATE = 0x00000100 # Subfile was created + DELETE = 0x00000200 # Subfile was deleted + DELETE_SELF = 0x00000400 # Self was deleted + MODIFY = 0x00000002 # File was modified + MOVE_SELF = 0x00000800 # Self was moved + MOVED_FROM = 0x00000040 # File was moved from X + MOVED_TO = 0x00000080 # File was moved to Y + OPEN = 0x00000020 # File was opened + + # Events sent by the kernel to a watch + IGNORED = 0x00008000 # File was ignored + ISDIR = 0x40000000 # Event occurred against directory + Q_OVERFLOW = 0x00004000 # Event queued overflowed + UNMOUNT = 0x00002000 # Backing file system was unmounted + + # Helper userspace events +# CLOSE = CLOSE_WRITE | CLOSE_NOWRITE # Close +# MOVE = MOVED_FROM | MOVED_TO # Moves + + # Helper for userspace events +# ALL_EVENTS = ( +# ACCESS +# | ATTRIB +# | CLOSE_WRITE +# | CLOSE_NOWRITE +# | CREATE +# | DELETE +# | DELETE_SELF +# | MODIFY +# | MOVE_SELF +# | MOVED_FROM +# | MOVED_TO +# | OPEN +# ) + + # Helper for all modify events + ALL_MODIFY_EVENTS = ( + CLOSE_WRITE + | CREATE + | DELETE + | DELETE_SELF + | MODIFY + | MOVE_SELF + | MOVED_FROM + | MOVED_TO + ) + + # Special flags for watch() +# DONT_FOLLOW = 0x02000000 # Don't follow a symbolic link +# EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects +# MASK_CREATE = 0x10000000 # Don't overwrite existent watchers (since 4.18) +# MASK_ADD = 0x20000000 # Add to the mask of an existing watch +# ONESHOT = 0x80000000 # Only send event once +# ONLYDIR = 0x01000000 # Only watch the path if it's a directory + + @classmethod + def to_string(cls, mask: int) -> str: + flags: List[str] = [] + for name in dir(cls): + if ( + name[0].isupper() + and not name.startswith("ALL_") + and name not in ["CLOSE", "MOVE"] + and mask & getattr(cls, name) + ): + flags.append(name) + return "|".join(flags) + + [email protected](frozen=True, repr=False) +class InotifyEvent: + wd: int + mask: int + cookie: int + name: str + path: str + + def __repr__(self) -> str: + return ( + f"<InotifyEvent: wd={self.wd}, mask={InotifyMask.to_string(self.mask)}," + f" cookie={self.cookie}, name={self.name}, path={self.path}>" + ) + + +class Inotify: + def __init__(self) -> None: + self.__fd = -1 + + self.__wd_by_path: Dict[str, int] = {} + self.__path_by_wd: Dict[int, str] = {} + + self.__moved: Dict[int, str] = {} + + self.__events_queue: asyncio.queues.Queue = asyncio.Queue() + + def watch(self, path: str, mask: int) -> None: + path = os.path.normpath(path) + assert path not in self.__wd_by_path, path + get_logger().info("Watching for %s: %s", path, InotifyMask.to_string(mask)) + wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask)) + self.__wd_by_path[path] = wd + self.__path_by_wd[wd] = path + +# def unwatch(self, path: str) -> None: +# path = os.path.normpath(path) +# assert path in self.__wd_by_path, path +# get_logger().info("Unwatching %s", path) +# wd = self.__wd_by_path[path] +# _inotify_check(_inotify_rm_watch(self.__fd, wd)) +# del self.__wd_by_path[path] +# del self.__path_by_wd[wd] + +# def has_events(self) -> bool: +# return (not self.__events_queue.empty()) + + async def get_event(self, timeout: float) -> Optional[InotifyEvent]: + assert timeout > 0 + try: + return (await asyncio.wait_for(self.__events_queue.get(), timeout=timeout)) + except asyncio.TimeoutError: + return None + + async def get_series(self, timeout: float) -> List[InotifyEvent]: + series: List[InotifyEvent] = [] + event = await self.get_event(timeout) + if event: + series.append(event) + while event: + event = await self.get_event(timeout) + if event: + series.append(event) + return series + + def __read_and_queue_events(self) -> None: + logger = get_logger() + for event in self.__read_parsed_events(): + # XXX: Ни в коем случае не приводить self.__read_parsed_events() к списку. + # Он использует self.__wd_by_path и self.__path_by_wd, содержимое которых + # корректируется кодом ниже. В противном случае все сломается. + + if event.mask & InotifyMask.MOVED_FROM: + self.__moved[event.cookie] = event.path # Save moved_from_path + elif event.mask & InotifyMask.MOVED_TO: + moved_from_path = self.__moved.pop(event.cookie, None) + if moved_from_path is not None: + wd = self.__wd_by_path.pop(moved_from_path, None) + if wd is not None: + self.__wd_by_path[event.path] = wd + self.__path_by_wd[wd] = event.path + + if event.mask & InotifyMask.IGNORED: + ignored_path = self.__path_by_wd[event.wd] + if self.__wd_by_path[ignored_path] == event.wd: + logger.info("Unwatching %s because IGNORED was received", ignored_path) + del self.__wd_by_path[ignored_path] + continue + + self.__events_queue.put_nowait(event) + + def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]: + for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()): + wd_path = self.__path_by_wd.get(wd, None) + if wd_path is not None: + name = _fs_decode(name_bytes) + path = (os.path.join(wd_path, name) if name else wd_path) # Avoid trailing slash + yield InotifyEvent(wd, mask, cookie, name, path) + + def __read_buffer(self) -> bytes: + while True: + try: + return os.read(self.__fd, _EVENTS_BUFFER_LENGTH) + except OSError as err: + if err.errno == errno.EINTR: + pass + + def __enter__(self) -> "Inotify": + assert self.__fd < 0 + self.__fd = _inotify_check(_inotify_init()) + asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) + return self + + def __exit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + if self.__fd >= 0: + asyncio.get_event_loop().remove_reader(self.__fd) + for wd in list(self.__wd_by_path.values()): + _inotify_rm_watch(self.__fd, wd) + try: + os.close(self.__fd) + except Exception: + pass |