summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2021-12-30 05:44:42 +0300
committerMaxim Devaev <[email protected]>2021-12-30 05:44:42 +0300
commitf609e857b14986bd8106685a60ae41ab2d1f8657 (patch)
treee03cc711fd0232976469d76092936a37dd2354ae /kvmd
parent885c14f9e4c1e787f8539fe2bb0e533bdb2caba2 (diff)
libc module
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/inotify.py34
-rw-r--r--kvmd/libc.py59
2 files changed, 63 insertions, 30 deletions
diff --git a/kvmd/inotify.py b/kvmd/inotify.py
index d046dd2c..d0f7bade 100644
--- a/kvmd/inotify.py
+++ b/kvmd/inotify.py
@@ -26,16 +26,11 @@ import sys
import os
import asyncio
import ctypes
-import ctypes.util
import struct
import dataclasses
import types
import errno
-from ctypes import c_int
-from ctypes import c_uint32
-from ctypes import c_char_p
-
from typing import Tuple
from typing import List
from typing import Dict
@@ -45,28 +40,7 @@ from typing import Optional
from .logging import get_logger
-
-# =====
-def _load_libc() -> ctypes.CDLL:
- path = ctypes.util.find_library("c")
- if not path:
- raise RuntimeError("Where is libc?")
- assert path
- lib = ctypes.CDLL(path)
- for (name, restype, argtypes) in [
- ("inotify_init", c_int, []),
- ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]),
- ("inotify_rm_watch", c_int, [c_int, c_uint32]),
- ]:
- func = getattr(lib, name)
- if not func:
- raise RuntimeError(f"Where is libc.{name}?")
- setattr(func, "restype", restype)
- setattr(func, "argtypes", argtypes)
- return lib
-
-
-_libc = _load_libc()
+from . import libc
# =====
@@ -225,7 +199,7 @@ class Inotify:
path = os.path.normpath(path)
assert path not in self.__wd_by_path, path
get_logger().info("Watching for %s", path)
- wd = _inotify_check(_libc.inotify_add_watch(self.__fd, _fs_encode(path), mask))
+ wd = _inotify_check(libc.inotify_add_watch(self.__fd, _fs_encode(path), mask))
self.__wd_by_path[path] = wd
self.__path_by_wd[wd] = path
@@ -303,7 +277,7 @@ class Inotify:
def __enter__(self) -> "Inotify":
assert self.__fd < 0
- self.__fd = _inotify_check(_libc.inotify_init())
+ self.__fd = _inotify_check(libc.inotify_init())
asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events)
return self
@@ -317,7 +291,7 @@ class Inotify:
if self.__fd >= 0:
asyncio.get_event_loop().remove_reader(self.__fd)
for wd in list(self.__wd_by_path.values()):
- _libc.inotify_rm_watch(self.__fd, wd)
+ libc.inotify_rm_watch(self.__fd, wd)
try:
os.close(self.__fd)
except Exception:
diff --git a/kvmd/libc.py b/kvmd/libc.py
new file mode 100644
index 00000000..f2e3370c
--- /dev/null
+++ b/kvmd/libc.py
@@ -0,0 +1,59 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> #
+# #
+# This source file is partially based on python-watchdog module. #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import ctypes
+import ctypes.util
+
+from ctypes import c_int
+from ctypes import c_uint32
+from ctypes import c_char_p
+
+
+# =====
+def _load_libc() -> ctypes.CDLL:
+ path = ctypes.util.find_library("c")
+ if not path:
+ raise RuntimeError("Where is libc?")
+ assert path
+ lib = ctypes.CDLL(path)
+ for (name, restype, argtypes) in [
+ ("inotify_init", c_int, []),
+ ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]),
+ ("inotify_rm_watch", c_int, [c_int, c_uint32]),
+ ]:
+ func = getattr(lib, name)
+ if not func:
+ raise RuntimeError(f"Where is libc.{name}?")
+ setattr(func, "restype", restype)
+ setattr(func, "argtypes", argtypes)
+ return lib
+
+
+_libc = _load_libc()
+
+
+# =====
+inotify_init = _libc.inotify_init
+inotify_add_watch = _libc.inotify_add_watch
+inotify_rm_watch = _libc.inotify_rm_watch