diff options
author | Maxim Devaev <[email protected]> | 2021-12-30 05:44:42 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2021-12-30 05:44:42 +0300 |
commit | f609e857b14986bd8106685a60ae41ab2d1f8657 (patch) | |
tree | e03cc711fd0232976469d76092936a37dd2354ae | |
parent | 885c14f9e4c1e787f8539fe2bb0e533bdb2caba2 (diff) |
libc module
-rw-r--r-- | kvmd/inotify.py | 34 | ||||
-rw-r--r-- | kvmd/libc.py | 59 |
2 files changed, 63 insertions, 30 deletions
diff --git a/kvmd/inotify.py b/kvmd/inotify.py index d046dd2c..d0f7bade 100644 --- a/kvmd/inotify.py +++ b/kvmd/inotify.py @@ -26,16 +26,11 @@ import sys import os import asyncio import ctypes -import ctypes.util import struct import dataclasses import types import errno -from ctypes import c_int -from ctypes import c_uint32 -from ctypes import c_char_p - from typing import Tuple from typing import List from typing import Dict @@ -45,28 +40,7 @@ from typing import Optional from .logging import get_logger - -# ===== -def _load_libc() -> ctypes.CDLL: - path = ctypes.util.find_library("c") - if not path: - raise RuntimeError("Where is libc?") - assert path - lib = ctypes.CDLL(path) - for (name, restype, argtypes) in [ - ("inotify_init", c_int, []), - ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]), - ("inotify_rm_watch", c_int, [c_int, c_uint32]), - ]: - func = getattr(lib, name) - if not func: - raise RuntimeError(f"Where is libc.{name}?") - setattr(func, "restype", restype) - setattr(func, "argtypes", argtypes) - return lib - - -_libc = _load_libc() +from . import libc # ===== @@ -225,7 +199,7 @@ class Inotify: path = os.path.normpath(path) assert path not in self.__wd_by_path, path get_logger().info("Watching for %s", path) - wd = _inotify_check(_libc.inotify_add_watch(self.__fd, _fs_encode(path), mask)) + wd = _inotify_check(libc.inotify_add_watch(self.__fd, _fs_encode(path), mask)) self.__wd_by_path[path] = wd self.__path_by_wd[wd] = path @@ -303,7 +277,7 @@ class Inotify: def __enter__(self) -> "Inotify": assert self.__fd < 0 - self.__fd = _inotify_check(_libc.inotify_init()) + self.__fd = _inotify_check(libc.inotify_init()) asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) return self @@ -317,7 +291,7 @@ class Inotify: if self.__fd >= 0: asyncio.get_event_loop().remove_reader(self.__fd) for wd in list(self.__wd_by_path.values()): - _libc.inotify_rm_watch(self.__fd, wd) + libc.inotify_rm_watch(self.__fd, wd) try: os.close(self.__fd) except Exception: diff --git a/kvmd/libc.py b/kvmd/libc.py new file mode 100644 index 00000000..f2e3370c --- /dev/null +++ b/kvmd/libc.py @@ -0,0 +1,59 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> # +# # +# This source file is partially based on python-watchdog module. # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import ctypes +import ctypes.util + +from ctypes import c_int +from ctypes import c_uint32 +from ctypes import c_char_p + + +# ===== +def _load_libc() -> ctypes.CDLL: + path = ctypes.util.find_library("c") + if not path: + raise RuntimeError("Where is libc?") + assert path + lib = ctypes.CDLL(path) + for (name, restype, argtypes) in [ + ("inotify_init", c_int, []), + ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]), + ("inotify_rm_watch", c_int, [c_int, c_uint32]), + ]: + func = getattr(lib, name) + if not func: + raise RuntimeError(f"Where is libc.{name}?") + setattr(func, "restype", restype) + setattr(func, "argtypes", argtypes) + return lib + + +_libc = _load_libc() + + +# ===== +inotify_init = _libc.inotify_init +inotify_add_watch = _libc.inotify_add_watch +inotify_rm_watch = _libc.inotify_rm_watch |