summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-09-12 17:05:35 +0300
committerMaxim Devaev <[email protected]>2024-09-12 17:05:35 +0300
commit445e2e04e2cdc06794e1341a6ad9153bcff18bec (patch)
treee895ba67a0a92bc3680ebba0af3ca6d9490c964c /kvmd/apps
parent489601bb96162a29c1de2ebde5e4e62ef53993a7 (diff)
oled: sensors class
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/oled/__init__.py88
-rw-r--r--kvmd/apps/oled/sensors.py128
2 files changed, 138 insertions, 78 deletions
diff --git a/kvmd/apps/oled/__init__.py b/kvmd/apps/oled/__init__.py
index 9fc4acb8..753933ff 100644
--- a/kvmd/apps/oled/__init__.py
+++ b/kvmd/apps/oled/__init__.py
@@ -3,7 +3,7 @@
# #
# KVMD-OLED - A small OLED daemon for PiKVM. #
# #
-# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
@@ -23,15 +23,11 @@
import sys
import os
-import socket
import signal
import itertools
import logging
-import datetime
import time
-import netifaces
-import psutil
import usb.core
from luma.core import cmdline as luma_cmdline
@@ -41,74 +37,11 @@ from luma.core.render import canvas as luma_canvas
from PIL import Image
from PIL import ImageFont
-
-# =====
-_logger = logging.getLogger("oled")
+from .sensors import Sensors
# =====
-def _get_ip() -> tuple[str, str]:
- try:
- gws = netifaces.gateways()
- if "default" in gws:
- for proto in [socket.AF_INET, socket.AF_INET6]:
- if proto in gws["default"]:
- iface = gws["default"][proto][1]
- addrs = netifaces.ifaddresses(iface)
- return (iface, addrs[proto][0]["addr"])
-
- for iface in netifaces.interfaces():
- if not iface.startswith(("lo", "docker")):
- addrs = netifaces.ifaddresses(iface)
- for proto in [socket.AF_INET, socket.AF_INET6]:
- if proto in addrs:
- return (iface, addrs[proto][0]["addr"])
- except Exception:
- # _logger.exception("Can't get iface/IP")
- pass
- return ("<no-iface>", "<no-ip>")
-
-
-def _get_uptime() -> str:
- uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time()))
- pl = {"days": uptime.days}
- (pl["hours"], rem) = divmod(uptime.seconds, 3600)
- (pl["mins"], pl["secs"]) = divmod(rem, 60)
- return "{days}d {hours}h {mins}m".format(**pl)
-
-
-def _get_temp(fahrenheit: bool) -> str:
- try:
- with open("/sys/class/thermal/thermal_zone0/temp") as temp_file:
- temp = int((temp_file.read().strip())) / 1000
- if fahrenheit:
- temp = temp * 9 / 5 + 32
- return f"{temp:.1f}\u00b0F"
- return f"{temp:.1f}\u00b0C"
- except Exception:
- # _logger.exception("Can't read temp")
- return "<no-temp>"
-
-
-def _get_cpu() -> str:
- st = psutil.cpu_times_percent()
- user = st.user - st.guest
- nice = st.nice - st.guest_nice
- idle_all = st.idle + st.iowait
- system_all = st.system + st.irq + st.softirq
- virtual = st.guest + st.guest_nice
- total = max(1, user + nice + system_all + idle_all + st.steal + virtual)
- percent = int(
- st.nice / total * 100
- + st.user / total * 100
- + system_all / total * 100
- + (st.steal + st.guest) / total * 100
- )
- return f"{percent}%"
-
-
-def _get_mem() -> str:
- return f"{int(psutil.virtual_memory().percent)}%"
+_logger = logging.getLogger("oled")
# =====
@@ -248,21 +181,20 @@ def main() -> None: # pylint: disable=too-many-locals,too-many-branches,too-man
swim += 1
time.sleep(0.5)
+ sensors = Sensors(options.fahrenheit)
+
if device.height >= 64:
while stop_reason is None:
- (iface, ip) = _get_ip()
- text = f"{socket.getfqdn()}\n{ip}\niface: {iface}\ntemp: {_get_temp(options.fahrenheit)}"
- text += f"\ncpu: {_get_cpu()} mem: {_get_mem()}\n(__hb__) {_get_uptime()}"
- draw(text)
+ text = "{fqdn}\n{ip}\niface: {iface}\ntemp: {temp}\ncpu: {cpu} mem: {mem}\n(__hb__) {uptime}"
+ draw(sensors.render(text))
else:
summary = True
while stop_reason is None:
if summary:
- text = f"{socket.getfqdn()}\n(__hb__) {_get_uptime()}\ntemp: {_get_temp(options.fahrenheit)}"
+ text = "{fqdn}\n(__hb__) {uptime}\ntemp: {temp}"
else:
- (iface, ip) = _get_ip()
- text = "%s\n(__hb__) iface: %s\ncpu: %s mem: %s" % (ip, iface, _get_cpu(), _get_mem())
- draw(text)
+ text = "{ip}\n(__hb__) iface: {iface}\ncpu: {cpu} mem: {mem}"
+ draw(sensors.render(text))
summary = (not summary)
if stop_reason is not None:
diff --git a/kvmd/apps/oled/sensors.py b/kvmd/apps/oled/sensors.py
new file mode 100644
index 00000000..9ae5cd8d
--- /dev/null
+++ b/kvmd/apps/oled/sensors.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python3
+# ========================================================================== #
+# #
+# KVMD-OLED - A small OLED daemon for PiKVM. #
+# #
+# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import socket
+import functools
+import datetime
+import time
+
+import netifaces
+import psutil
+
+
+# =====
+class Sensors:
+ def __init__(self, fahrenheit: bool) -> None:
+ self.__fahrenheit = fahrenheit
+ self.__sensors = {
+ "fqdn": socket.getfqdn,
+ "iface": self.__get_iface,
+ "ip": self.__get_ip,
+ "uptime": self.__get_uptime,
+ "temp": self.__get_temp,
+ "cpu": self.__get_cpu,
+ "mem": self.__get_mem,
+ }
+
+ def render(self, text: str) -> str:
+ return text.format_map(self)
+
+ def __getitem__(self, key: str) -> str:
+ return self.__sensors[key]() # type: ignore
+
+ # =====
+
+ def __get_iface(self) -> str:
+ print("get_iface")
+ return self.__get_netconf(round(time.monotonic() / 0.3))[0]
+
+ def __get_ip(self) -> str:
+ print("get_ip")
+ return self.__get_netconf(round(time.monotonic() / 0.3))[1]
+
+ @functools.lru_cache(maxsize=1)
+ def __get_netconf(self, ts: int) -> tuple[str, str]:
+ _ = ts
+ try:
+ gws = netifaces.gateways()
+ if "default" in gws:
+ for proto in [socket.AF_INET, socket.AF_INET6]:
+ if proto in gws["default"]:
+ iface = gws["default"][proto][1]
+ addrs = netifaces.ifaddresses(iface)
+ return (iface, addrs[proto][0]["addr"])
+
+ for iface in netifaces.interfaces():
+ if not iface.startswith(("lo", "docker")):
+ addrs = netifaces.ifaddresses(iface)
+ for proto in [socket.AF_INET, socket.AF_INET6]:
+ if proto in addrs:
+ return (iface, addrs[proto][0]["addr"])
+ except Exception:
+ # _logger.exception("Can't get iface/IP")
+ pass
+ return ("<no-iface>", "<no-ip>")
+
+ # =====
+
+ def __get_uptime(self) -> str:
+ uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time()))
+ pl = {"days": uptime.days}
+ (pl["hours"], rem) = divmod(uptime.seconds, 3600)
+ (pl["mins"], pl["secs"]) = divmod(rem, 60)
+ return "{days}d {hours}h {mins}m".format(**pl)
+
+ # =====
+
+ def __get_temp(self) -> str:
+ try:
+ with open("/sys/class/thermal/thermal_zone0/temp") as file:
+ temp = int(file.read().strip()) / 1000
+ if self.__fahrenheit:
+ temp = temp * 9 / 5 + 32
+ return f"{temp:.1f}\u00b0F"
+ return f"{temp:.1f}\u00b0C"
+ except Exception:
+ # _logger.exception("Can't read temp")
+ return "<no-temp>"
+
+ # =====
+
+ def __get_cpu(self) -> str:
+ st = psutil.cpu_times_percent()
+ user = st.user - st.guest
+ nice = st.nice - st.guest_nice
+ idle_all = st.idle + st.iowait
+ system_all = st.system + st.irq + st.softirq
+ virtual = st.guest + st.guest_nice
+ total = max(1, user + nice + system_all + idle_all + st.steal + virtual)
+ percent = int(
+ st.nice / total * 100
+ + st.user / total * 100
+ + system_all / total * 100
+ + (st.steal + st.guest) / total * 100
+ )
+ return f"{percent}%"
+
+ def __get_mem(self) -> str:
+ return f"{int(psutil.virtual_memory().percent)}%"