1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package roadrunner
import (
"context"
"github.com/spiral/roadrunner/v2/util"
"time"
)
const MB = 1024 * 1024
type SupervisedPool interface {
Pool
// ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context
// deadline reached.
ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
type supervisedPool struct {
cfg SupervisorConfig
events *util.EventHandler
pool Pool
stopCh chan struct{}
}
func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
return &supervisedPool{
cfg: cfg,
events: events,
pool: pool,
stopCh: make(chan struct{}),
}
}
func (sp *supervisedPool) Start() {
go func() {
watchTout := time.NewTicker(sp.cfg.WatchTick)
for {
select {
case <-sp.stopCh:
watchTout.Stop()
return
// stop here
case <-watchTout.C:
sp.control()
}
}
}()
}
func (sp *supervisedPool) Stop() {
sp.stopCh <- struct{}{}
}
func (sp *supervisedPool) control() {
now := time.Now()
ctx := context.TODO()
// THIS IS A COPY OF WORKERS
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().Value() == StateInvalid {
continue
}
s, err := WorkerProcessState(workers[i])
if err != nil {
// worker not longer valid for supervision
continue
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
}
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
// TODO events
//sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)}
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
}
continue
}
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
if workers[i].State().Value() != StateReady {
continue
}
/*
Calculate idle time
If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
we are guessing that worker overlap idle time and has to be killed
*/
// get last used unix nano
lu := workers[i].State().LastUsed()
// convert last used to unixNano and sub time.now
res := int64(lu) - now.UnixNano()
// maxWorkerIdle more than diff between now and last used
if sp.cfg.IdleTTL-res <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
}
}
}
// the very last step is to calculate pool memory usage (except excluded workers)
//totalUsedMemory += s.MemoryUsage
}
//// if current usage more than max allowed pool memory usage
//if totalUsedMemory > sp.maxPoolMemory {
// sp.pool.Destroy(ctx)
//}
}
|