diff options
author | Maxim Devaev <[email protected]> | 2021-12-24 19:20:58 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2021-12-24 19:39:43 +0300 |
commit | 73a6a153360d50b9819ed35907e0f26aaee72581 (patch) | |
tree | 61cf766defd21732a5fd2397bfecdca1fa4f89b7 | |
parent | 3c029a6c46924e6418b7d6631f1f8d4696a804d5 (diff) |
refactoring
-rw-r--r-- | kvmd/inotify.py | 35 |
1 files changed, 19 insertions, 16 deletions
diff --git a/kvmd/inotify.py b/kvmd/inotify.py index 1bd43d61..d046dd2c 100644 --- a/kvmd/inotify.py +++ b/kvmd/inotify.py @@ -33,8 +33,8 @@ import types import errno from ctypes import c_int -from ctypes import c_char_p from ctypes import c_uint32 +from ctypes import c_char_p from typing import Tuple from typing import List @@ -49,23 +49,26 @@ from .logging import get_logger # ===== def _load_libc() -> ctypes.CDLL: path = ctypes.util.find_library("c") - if path: - return ctypes.CDLL(path) - raise RuntimeError("Where is libc?") + if not path: + raise RuntimeError("Where is libc?") + assert path + lib = ctypes.CDLL(path) + for (name, restype, argtypes) in [ + ("inotify_init", c_int, []), + ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]), + ("inotify_rm_watch", c_int, [c_int, c_uint32]), + ]: + func = getattr(lib, name) + if not func: + raise RuntimeError(f"Where is libc.{name}?") + setattr(func, "restype", restype) + setattr(func, "argtypes", argtypes) + return lib _libc = _load_libc() -def _get_libc_func(name: str, restype, argtypes=None): # type: ignore - return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc)) - - -_inotify_init = _get_libc_func("inotify_init", c_int) -_inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]) -_inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32]) - - # ===== _EVENT_HEAD_FMT = "iIII" _EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT) @@ -222,7 +225,7 @@ class Inotify: path = os.path.normpath(path) assert path not in self.__wd_by_path, path get_logger().info("Watching for %s", path) - wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask)) + wd = _inotify_check(_libc.inotify_add_watch(self.__fd, _fs_encode(path), mask)) self.__wd_by_path[path] = wd self.__path_by_wd[wd] = path @@ -300,7 +303,7 @@ class Inotify: def __enter__(self) -> "Inotify": assert self.__fd < 0 - self.__fd = _inotify_check(_inotify_init()) + self.__fd = _inotify_check(_libc.inotify_init()) asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) return self @@ -314,7 +317,7 @@ class Inotify: if self.__fd >= 0: asyncio.get_event_loop().remove_reader(self.__fd) for wd in list(self.__wd_by_path.values()): - _inotify_rm_watch(self.__fd, wd) + _libc.inotify_rm_watch(self.__fd, wd) try: os.close(self.__fd) except Exception: |