diff options
author | Maxim Devaev <[email protected]> | 2024-09-12 17:05:35 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-09-12 17:05:35 +0300 |
commit | 445e2e04e2cdc06794e1341a6ad9153bcff18bec (patch) | |
tree | e895ba67a0a92bc3680ebba0af3ca6d9490c964c /kvmd/apps | |
parent | 489601bb96162a29c1de2ebde5e4e62ef53993a7 (diff) |
oled: sensors class
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/oled/__init__.py | 88 | ||||
-rw-r--r-- | kvmd/apps/oled/sensors.py | 128 |
2 files changed, 138 insertions, 78 deletions
diff --git a/kvmd/apps/oled/__init__.py b/kvmd/apps/oled/__init__.py index 9fc4acb8..753933ff 100644 --- a/kvmd/apps/oled/__init__.py +++ b/kvmd/apps/oled/__init__.py @@ -3,7 +3,7 @@ # # # KVMD-OLED - A small OLED daemon for PiKVM. # # # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # @@ -23,15 +23,11 @@ import sys import os -import socket import signal import itertools import logging -import datetime import time -import netifaces -import psutil import usb.core from luma.core import cmdline as luma_cmdline @@ -41,74 +37,11 @@ from luma.core.render import canvas as luma_canvas from PIL import Image from PIL import ImageFont - -# ===== -_logger = logging.getLogger("oled") +from .sensors import Sensors # ===== -def _get_ip() -> tuple[str, str]: - try: - gws = netifaces.gateways() - if "default" in gws: - for proto in [socket.AF_INET, socket.AF_INET6]: - if proto in gws["default"]: - iface = gws["default"][proto][1] - addrs = netifaces.ifaddresses(iface) - return (iface, addrs[proto][0]["addr"]) - - for iface in netifaces.interfaces(): - if not iface.startswith(("lo", "docker")): - addrs = netifaces.ifaddresses(iface) - for proto in [socket.AF_INET, socket.AF_INET6]: - if proto in addrs: - return (iface, addrs[proto][0]["addr"]) - except Exception: - # _logger.exception("Can't get iface/IP") - pass - return ("<no-iface>", "<no-ip>") - - -def _get_uptime() -> str: - uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time())) - pl = {"days": uptime.days} - (pl["hours"], rem) = divmod(uptime.seconds, 3600) - (pl["mins"], pl["secs"]) = divmod(rem, 60) - return "{days}d {hours}h {mins}m".format(**pl) - - -def _get_temp(fahrenheit: bool) -> str: - try: - with open("/sys/class/thermal/thermal_zone0/temp") as temp_file: - temp = int((temp_file.read().strip())) / 1000 - if fahrenheit: - temp = temp * 9 / 5 + 32 - return f"{temp:.1f}\u00b0F" - return f"{temp:.1f}\u00b0C" - except Exception: - # _logger.exception("Can't read temp") - return "<no-temp>" - - -def _get_cpu() -> str: - st = psutil.cpu_times_percent() - user = st.user - st.guest - nice = st.nice - st.guest_nice - idle_all = st.idle + st.iowait - system_all = st.system + st.irq + st.softirq - virtual = st.guest + st.guest_nice - total = max(1, user + nice + system_all + idle_all + st.steal + virtual) - percent = int( - st.nice / total * 100 - + st.user / total * 100 - + system_all / total * 100 - + (st.steal + st.guest) / total * 100 - ) - return f"{percent}%" - - -def _get_mem() -> str: - return f"{int(psutil.virtual_memory().percent)}%" +_logger = logging.getLogger("oled") # ===== @@ -248,21 +181,20 @@ def main() -> None: # pylint: disable=too-many-locals,too-many-branches,too-man swim += 1 time.sleep(0.5) + sensors = Sensors(options.fahrenheit) + if device.height >= 64: while stop_reason is None: - (iface, ip) = _get_ip() - text = f"{socket.getfqdn()}\n{ip}\niface: {iface}\ntemp: {_get_temp(options.fahrenheit)}" - text += f"\ncpu: {_get_cpu()} mem: {_get_mem()}\n(__hb__) {_get_uptime()}" - draw(text) + text = "{fqdn}\n{ip}\niface: {iface}\ntemp: {temp}\ncpu: {cpu} mem: {mem}\n(__hb__) {uptime}" + draw(sensors.render(text)) else: summary = True while stop_reason is None: if summary: - text = f"{socket.getfqdn()}\n(__hb__) {_get_uptime()}\ntemp: {_get_temp(options.fahrenheit)}" + text = "{fqdn}\n(__hb__) {uptime}\ntemp: {temp}" else: - (iface, ip) = _get_ip() - text = "%s\n(__hb__) iface: %s\ncpu: %s mem: %s" % (ip, iface, _get_cpu(), _get_mem()) - draw(text) + text = "{ip}\n(__hb__) iface: {iface}\ncpu: {cpu} mem: {mem}" + draw(sensors.render(text)) summary = (not summary) if stop_reason is not None: diff --git a/kvmd/apps/oled/sensors.py b/kvmd/apps/oled/sensors.py new file mode 100644 index 00000000..9ae5cd8d --- /dev/null +++ b/kvmd/apps/oled/sensors.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# ========================================================================== # +# # +# KVMD-OLED - A small OLED daemon for PiKVM. # +# # +# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import socket +import functools +import datetime +import time + +import netifaces +import psutil + + +# ===== +class Sensors: + def __init__(self, fahrenheit: bool) -> None: + self.__fahrenheit = fahrenheit + self.__sensors = { + "fqdn": socket.getfqdn, + "iface": self.__get_iface, + "ip": self.__get_ip, + "uptime": self.__get_uptime, + "temp": self.__get_temp, + "cpu": self.__get_cpu, + "mem": self.__get_mem, + } + + def render(self, text: str) -> str: + return text.format_map(self) + + def __getitem__(self, key: str) -> str: + return self.__sensors[key]() # type: ignore + + # ===== + + def __get_iface(self) -> str: + print("get_iface") + return self.__get_netconf(round(time.monotonic() / 0.3))[0] + + def __get_ip(self) -> str: + print("get_ip") + return self.__get_netconf(round(time.monotonic() / 0.3))[1] + + @functools.lru_cache(maxsize=1) + def __get_netconf(self, ts: int) -> tuple[str, str]: + _ = ts + try: + gws = netifaces.gateways() + if "default" in gws: + for proto in [socket.AF_INET, socket.AF_INET6]: + if proto in gws["default"]: + iface = gws["default"][proto][1] + addrs = netifaces.ifaddresses(iface) + return (iface, addrs[proto][0]["addr"]) + + for iface in netifaces.interfaces(): + if not iface.startswith(("lo", "docker")): + addrs = netifaces.ifaddresses(iface) + for proto in [socket.AF_INET, socket.AF_INET6]: + if proto in addrs: + return (iface, addrs[proto][0]["addr"]) + except Exception: + # _logger.exception("Can't get iface/IP") + pass + return ("<no-iface>", "<no-ip>") + + # ===== + + def __get_uptime(self) -> str: + uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time())) + pl = {"days": uptime.days} + (pl["hours"], rem) = divmod(uptime.seconds, 3600) + (pl["mins"], pl["secs"]) = divmod(rem, 60) + return "{days}d {hours}h {mins}m".format(**pl) + + # ===== + + def __get_temp(self) -> str: + try: + with open("/sys/class/thermal/thermal_zone0/temp") as file: + temp = int(file.read().strip()) / 1000 + if self.__fahrenheit: + temp = temp * 9 / 5 + 32 + return f"{temp:.1f}\u00b0F" + return f"{temp:.1f}\u00b0C" + except Exception: + # _logger.exception("Can't read temp") + return "<no-temp>" + + # ===== + + def __get_cpu(self) -> str: + st = psutil.cpu_times_percent() + user = st.user - st.guest + nice = st.nice - st.guest_nice + idle_all = st.idle + st.iowait + system_all = st.system + st.irq + st.softirq + virtual = st.guest + st.guest_nice + total = max(1, user + nice + system_all + idle_all + st.steal + virtual) + percent = int( + st.nice / total * 100 + + st.user / total * 100 + + system_all / total * 100 + + (st.steal + st.guest) / total * 100 + ) + return f"{percent}%" + + def __get_mem(self) -> str: + return f"{int(psutil.virtual_memory().percent)}%" |