# ========================================================================== # # # # KVMD - The main Pi-KVM daemon. # # # # Copyright (C) 2018 Maxim Devaev # # # # This source file is partially based on python-watchdog module. # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # # ========================================================================== # import sys import os import asyncio import asyncio.queues import ctypes import ctypes.util import struct import dataclasses import types import errno from ctypes import c_int from ctypes import c_char_p from ctypes import c_uint32 from typing import Tuple from typing import List from typing import Dict from typing import Type from typing import Generator from typing import Optional from .logging import get_logger # ===== def _load_libc() -> ctypes.CDLL: try: path = ctypes.util.find_library("c") except (OSError, IOError, RuntimeError): pass else: if path: return ctypes.CDLL(path) names = ["libc.so", "libc.so.6", "libc.so.0"] for (index, name) in enumerate(names): try: return ctypes.CDLL(name) except (OSError, IOError): if index == len(names) - 1: raise raise RuntimeError("Where is libc?") _libc = _load_libc() def _get_libc_func(name: str, restype, argtypes=None): # type: ignore return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc)) _inotify_init = _get_libc_func("inotify_init", c_int) _inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]) _inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32]) # ===== _EVENT_HEAD_FMT = "iIII" _EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT) _EVENTS_BUFFER_LENGTH = 4096 * (_EVENT_HEAD_SIZE + 256) # count * (head + max_file_name_size + null_character) _FS_FALLBACK_ENCODING = "utf-8" _FS_ENCODING = (sys.getfilesystemencoding() or _FS_FALLBACK_ENCODING) # ===== def _inotify_parsed_buffer(data: bytes) -> Generator[Tuple[int, int, int, bytes], None, None]: offset = 0 while offset + _EVENT_HEAD_SIZE <= len(data): (wd, mask, cookie, length) = struct.unpack_from("iIII", data, offset) name = data[ offset + _EVENT_HEAD_SIZE # noqa: E203 : offset + _EVENT_HEAD_SIZE + length ].rstrip(b"\0") offset += _EVENT_HEAD_SIZE + length if wd >= 0: yield (wd, mask, cookie, name) def _inotify_check(retval: int) -> int: if retval < 0: c_errno = ctypes.get_errno() if c_errno == errno.ENOSPC: # pylint: disable=no-else-raise raise OSError(c_errno, "Inotify watch limit reached") elif c_errno == errno.EMFILE: raise OSError(c_errno, "Inotify instance limit reached") else: raise OSError(c_errno, os.strerror(c_errno)) return retval def _fs_encode(path: str) -> bytes: try: return path.encode(_FS_ENCODING, "strict") except UnicodeEncodeError: return path.encode(_FS_FALLBACK_ENCODING, "strict") def _fs_decode(path: bytes) -> str: try: return path.decode(_FS_ENCODING, "strict") except UnicodeDecodeError: return path.decode(_FS_FALLBACK_ENCODING, "strict") # ===== class InotifyMask: # Userspace events ACCESS = 0x00000001 # File was accessed ATTRIB = 0x00000004 # Meta-data changed CLOSE_WRITE = 0x00000008 # Writable file was closed CLOSE_NOWRITE = 0x00000010 # Unwritable file closed CREATE = 0x00000100 # Subfile was created DELETE = 0x00000200 # Subfile was deleted DELETE_SELF = 0x00000400 # Self was deleted MODIFY = 0x00000002 # File was modified MOVE_SELF = 0x00000800 # Self was moved MOVED_FROM = 0x00000040 # File was moved from X MOVED_TO = 0x00000080 # File was moved to Y OPEN = 0x00000020 # File was opened # Events sent by the kernel to a watch IGNORED = 0x00008000 # File was ignored ISDIR = 0x40000000 # Event occurred against directory Q_OVERFLOW = 0x00004000 # Event queued overflowed UNMOUNT = 0x00002000 # Backing file system was unmounted # Helper userspace events # CLOSE = CLOSE_WRITE | CLOSE_NOWRITE # Close # MOVE = MOVED_FROM | MOVED_TO # Moves # Helper for userspace events # ALL_EVENTS = ( # ACCESS # | ATTRIB # | CLOSE_WRITE # | CLOSE_NOWRITE # | CREATE # | DELETE # | DELETE_SELF # | MODIFY # | MOVE_SELF # | MOVED_FROM # | MOVED_TO # | OPEN # ) # Helper for all modify events ALL_MODIFY_EVENTS = ( CLOSE_WRITE | CREATE | DELETE | DELETE_SELF | MODIFY | MOVE_SELF | MOVED_FROM | MOVED_TO ) # Special flags for watch() # DONT_FOLLOW = 0x02000000 # Don't follow a symbolic link # EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects # MASK_CREATE = 0x10000000 # Don't overwrite existent watchers (since 4.18) # MASK_ADD = 0x20000000 # Add to the mask of an existing watch # ONESHOT = 0x80000000 # Only send event once # ONLYDIR = 0x01000000 # Only watch the path if it's a directory @classmethod def to_string(cls, mask: int) -> str: flags: List[str] = [] for name in dir(cls): if ( name[0].isupper() and not name.startswith("ALL_") and name not in ["CLOSE", "MOVE"] and mask & getattr(cls, name) ): flags.append(name) return "|".join(flags) @dataclasses.dataclass(frozen=True, repr=False) class InotifyEvent: wd: int mask: int cookie: int name: str path: str def __repr__(self) -> str: return ( f"" ) class Inotify: def __init__(self) -> None: self.__fd = -1 self.__wd_by_path: Dict[str, int] = {} self.__path_by_wd: Dict[int, str] = {} self.__moved: Dict[int, str] = {} self.__events_queue: asyncio.queues.Queue = asyncio.Queue() def watch(self, path: str, mask: int) -> None: path = os.path.normpath(path) assert path not in self.__wd_by_path, path get_logger().info("Watching for %s", path) wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask)) self.__wd_by_path[path] = wd self.__path_by_wd[wd] = path # def unwatch(self, path: str) -> None: # path = os.path.normpath(path) # assert path in self.__wd_by_path, path # get_logger().info("Unwatching %s", path) # wd = self.__wd_by_path[path] # _inotify_check(_inotify_rm_watch(self.__fd, wd)) # del self.__wd_by_path[path] # del self.__path_by_wd[wd] # def has_events(self) -> bool: # return (not self.__events_queue.empty()) async def get_event(self, timeout: float) -> Optional[InotifyEvent]: assert timeout > 0 try: return (await asyncio.wait_for(self.__events_queue.get(), timeout=timeout)) except asyncio.TimeoutError: return None async def get_series(self, timeout: float) -> List[InotifyEvent]: series: List[InotifyEvent] = [] event = await self.get_event(timeout) if event: series.append(event) while event: event = await self.get_event(timeout) if event: series.append(event) return series def __read_and_queue_events(self) -> None: logger = get_logger() for event in self.__read_parsed_events(): # XXX: Ни в коем случае не приводить self.__read_parsed_events() к списку. # Он использует self.__wd_by_path и self.__path_by_wd, содержимое которых # корректируется кодом ниже. В противном случае все сломается. if event.mask & InotifyMask.MOVED_FROM: self.__moved[event.cookie] = event.path # Save moved_from_path elif event.mask & InotifyMask.MOVED_TO: moved_from_path = self.__moved.pop(event.cookie, None) if moved_from_path is not None: wd = self.__wd_by_path.pop(moved_from_path, None) if wd is not None: self.__wd_by_path[event.path] = wd self.__path_by_wd[wd] = event.path if event.mask & InotifyMask.IGNORED: ignored_path = self.__path_by_wd[event.wd] if self.__wd_by_path[ignored_path] == event.wd: logger.info("Unwatching %s because IGNORED was received", ignored_path) del self.__wd_by_path[ignored_path] continue self.__events_queue.put_nowait(event) def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]: for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()): wd_path = self.__path_by_wd.get(wd, None) if wd_path is not None: name = _fs_decode(name_bytes) path = (os.path.join(wd_path, name) if name else wd_path) # Avoid trailing slash yield InotifyEvent(wd, mask, cookie, name, path) def __read_buffer(self) -> bytes: while True: try: return os.read(self.__fd, _EVENTS_BUFFER_LENGTH) except OSError as err: if err.errno == errno.EINTR: pass def __enter__(self) -> "Inotify": assert self.__fd < 0 self.__fd = _inotify_check(_inotify_init()) asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) return self def __exit__( self, _exc_type: Type[BaseException], _exc: BaseException, _tb: types.TracebackType, ) -> None: if self.__fd >= 0: asyncio.get_event_loop().remove_reader(self.__fd) for wd in list(self.__wd_by_path.values()): _inotify_rm_watch(self.__fd, wd) try: os.close(self.__fd) except Exception: pass