summaryrefslogtreecommitdiff
path: root/kvmd/aiomulti.py
blob: a45372040de7cc05a10916f49819cf31e6bbd902 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# ========================================================================== #
#                                                                            #
#    KVMD - The main PiKVM daemon.                                           #
#                                                                            #
#    Copyright (C) 2018-2024  Maxim Devaev <[email protected]>               #
#                                                                            #
#    This program is free software: you can redistribute it and/or modify    #
#    it under the terms of the GNU General Public License as published by    #
#    the Free Software Foundation, either version 3 of the License, or       #
#    (at your option) any later version.                                     #
#                                                                            #
#    This program is distributed in the hope that it will be useful,         #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of          #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
#    GNU General Public License for more details.                            #
#                                                                            #
#    You should have received a copy of the GNU General Public License       #
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.  #
#                                                                            #
# ========================================================================== #


import multiprocessing
import queue

from typing import Type
from typing import TypeVar
from typing import Generic

from . import aiotools


# =====
_QueueItemT = TypeVar("_QueueItemT")


async def queue_get_last(  # pylint: disable=invalid-name
    q: "multiprocessing.Queue[_QueueItemT]",
    timeout: float,
) -> tuple[bool, (_QueueItemT | None)]:

    return (await aiotools.run_async(queue_get_last_sync, q, timeout))


def queue_get_last_sync(  # pylint: disable=invalid-name
    q: "multiprocessing.Queue[_QueueItemT]",
    timeout: float,
) -> tuple[bool, (_QueueItemT | None)]:

    try:
        item = q.get(timeout=timeout)
        while not q.empty():
            item = q.get()
        return (True, item)
    except queue.Empty:
        return (False, None)


# =====
class AioProcessNotifier:
    def __init__(self) -> None:
        self.__queue: "multiprocessing.Queue[int]" = multiprocessing.Queue()

    def notify(self, mask: int=0) -> None:
        self.__queue.put_nowait(mask)

    async def wait(self) -> int:
        while True:
            mask = await aiotools.run_async(self.__get)
            if mask >= 0:
                return mask

    def __get(self) -> int:
        try:
            mask = self.__queue.get(timeout=0.1)
            while not self.__queue.empty():
                mask |= self.__queue.get()
            return mask
        except queue.Empty:
            return -1


# =====
_SharedFlagT = TypeVar("_SharedFlagT", int, bool)


class AioSharedFlags(Generic[_SharedFlagT]):
    def __init__(
        self,
        initial: dict[str, _SharedFlagT],
        notifier: AioProcessNotifier,
        type: Type[_SharedFlagT]=bool,  # pylint: disable=redefined-builtin
    ) -> None:

        self.__notifier = notifier
        self.__type: Type[_SharedFlagT] = type

        self.__flags = {
            key: multiprocessing.RawValue("i", int(value))  # type: ignore
            for (key, value) in initial.items()
        }

        self.__lock = multiprocessing.Lock()

    def update(self, **kwargs: _SharedFlagT) -> None:
        changed = False
        with self.__lock:
            for (key, value) in kwargs.items():
                value = int(value)  # type: ignore
                if self.__flags[key].value != value:
                    self.__flags[key].value = value
                    changed = True
        if changed:
            self.__notifier.notify()

    async def get(self) -> dict[str, _SharedFlagT]:
        return (await aiotools.run_async(self.__inner_get))

    def __inner_get(self) -> dict[str, _SharedFlagT]:
        with self.__lock:
            return {
                key: self.__type(shared.value)
                for (key, shared) in self.__flags.items()
            }