1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import time
from typing import Callable
from ...logging import get_logger
from ... import aiotools
from ...plugins.hid import BaseHid
from .streamer import Streamer
# =====
class Snapshoter: # pylint: disable=too-many-instance-attributes
def __init__(
self,
hid: BaseHid,
streamer: Streamer,
idle_interval: float,
live_interval: float,
wakeup_key: str,
wakeup_move: int,
online_delay: float,
retries: int,
retries_delay: float,
) -> None:
self.__hid = hid
self.__streamer = streamer
if idle_interval or live_interval:
self.__idle_interval = (idle_interval or live_interval)
self.__live_interval = (live_interval or idle_interval)
assert self.__idle_interval
assert self.__live_interval
else:
self.__idle_interval = self.__live_interval = 0.0
self.__wakeup_key = wakeup_key
self.__wakeup_move = wakeup_move
self.__online_delay = online_delay
self.__retries = retries
self.__retries_delay = retries_delay
self.__snapshoting = False
async def run(self, is_live: Callable[[], bool], notifier: aiotools.AioNotifier) -> None:
if self.__idle_interval or self.__live_interval:
get_logger(0).info("Running periodic stream snapshot: idle=%.2f; live=%.2f ...",
self.__idle_interval, self.__live_interval)
last_snapshot_ts = 0.0
while True:
live = is_live()
if last_snapshot_ts + (self.__live_interval if live else self.__idle_interval) < time.monotonic():
await self.__take_snapshot(live, notifier)
last_snapshot_ts = time.monotonic()
await asyncio.sleep(min(self.__idle_interval, self.__live_interval))
else:
await aiotools.wait_infinite()
def snapshoting(self) -> bool:
return self.__snapshoting
async def __take_snapshot(self, live: bool, notifier: aiotools.AioNotifier) -> None:
logger = get_logger(0)
if not live:
logger.info("Time to take the new idle snapshot")
try:
self.__snapshoting = True
notifier.notify()
if not live:
await self.__wakeup()
retries = self.__retries
while retries:
snapshot = await self.__streamer.take_snapshot(save=True, load=False, allow_offline=False)
if snapshot:
if not live:
logger.info("New idle snapshot saved: %dx%d", snapshot.width, snapshot.height)
break
retries -= 1
await asyncio.sleep(self.__retries_delay)
else:
logger.error("Can't take snapshot after %d retries", self.__retries)
except Exception: # Этого вообще-то не должно случаться, апи внутри заэксцепчены, но мало ли
logger.exception("Unhandled exception while taking snapshot")
finally:
self.__snapshoting = False
notifier.notify()
async def __wakeup(self) -> None:
logger = get_logger(0)
if self.__wakeup_key:
logger.info("Waking up using key %r ...", self.__wakeup_key)
await self.__hid.send_key_events(
keys=[(self.__wakeup_key, True), (self.__wakeup_key, False)],
no_ignore_keys=True,
)
if self.__wakeup_move:
logger.info("Waking up using mouse move for %d units ...", self.__wakeup_move)
for (to_x, to_y) in [(0, 0), (self.__wakeup_move, self.__wakeup_move), (0, 0)]:
self.__hid.send_mouse_move_event(to_x, to_y)
if self.__online_delay:
logger.info("Waiting %.2f seconds for online ...", self.__online_delay)
await asyncio.sleep(self.__online_delay)
|