blob: 70f6f2dfe3e164debd55d1fb1fcd22cc0edcb333 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import sys
import re
import asyncio
import logging
import time
from typing import Dict
from typing import AsyncGenerator
import systemd.journal
# =====
def get_logger(depth: int=1) -> logging.Logger:
frame = sys._getframe(1) # pylint: disable=protected-access
frames = []
while frame:
frames.append(frame)
frame = frame.f_back
if len(frames) - 1 >= depth:
break
name = frames[depth].f_globals["__name__"]
return logging.getLogger(name)
class Log:
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self.__loop = loop
async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[Dict, None]:
reader = systemd.journal.Reader()
reader.this_boot()
reader.this_machine()
reader.log_level(systemd.journal.LOG_DEBUG)
services = set(
service
for service in systemd.journal.Reader().query_unique("_SYSTEMD_UNIT")
if re.match(r"kvmd(-\w+)?\.service", service)
).union(["kvmd.service"])
for service in services:
reader.add_match(_SYSTEMD_UNIT=service)
if seek > 0:
reader.seek_realtime(float(time.time() - seek))
for entry in reader:
yield self.__entry_to_record(entry)
while follow:
entry = reader.get_next()
if entry:
yield self.__entry_to_record(entry)
else:
await asyncio.sleep(1)
def __entry_to_record(self, entry: Dict) -> Dict[str, Dict]:
return {
"dt": entry["__REALTIME_TIMESTAMP"],
"service": entry["_SYSTEMD_UNIT"],
"msg": entry["MESSAGE"].rstrip(),
}
|