1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
package worker_watcher //nolint:golint,stylecheck
import (
"context"
"runtime"
"sync"
"time"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
)
type Stack struct {
workers []worker.BaseProcess
mutex sync.RWMutex
destroy bool
actualNumOfWorkers int64
}
func NewWorkersStack() *Stack {
w := runtime.NumCPU()
return &Stack{
workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
}
}
func (stack *Stack) Reset() {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
return len(stack.workers) == 0
}
func (stack *Stack) Pop() (worker.BaseProcess, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
// do not release new stack
if stack.destroy {
return nil, true
}
if len(stack.workers) == 0 {
return nil, false
}
// move worker
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
stack.actualNumOfWorkers--
return w, false
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
for i := 0; i < len(stack.workers); i++ {
// worker in the stack, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
// worker found and removed
return true
}
}
// no worker with such ID
return false
}
// Workers return copy of the workers in the stack
func (stack *Stack) Workers() []worker.BaseProcess {
stack.mutex.Lock()
defer stack.mutex.Unlock()
workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
for _, v := range stack.workers {
workersCopy = append(workersCopy, v)
}
return workersCopy
}
func (stack *Stack) isDestroying() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
stack.destroy = true
stack.mutex.Unlock()
tt := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-tt.C:
stack.mutex.Lock()
// that might be one of the workers is working
if len(stack.workers) != int(stack.actualNumOfWorkers) {
stack.mutex.Unlock()
continue
}
stack.mutex.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
stack.workers[i].State().Set(internal.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
stack.mutex.Unlock()
tt.Stop()
// clear
stack.Reset()
return
}
}
}
|