1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
package worker_watcher //nolint:golint,stylecheck
import (
"context"
"runtime"
"sync"
"time"
"github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
workers []*worker.SyncWorkerImpl
mutex sync.RWMutex
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
}
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
workers: make([]*worker.SyncWorkerImpl, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
}
func (stack *Stack) IsEmpty() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
return len(stack.workers) == 0
}
func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
// do not release new stack
if stack.destroy {
return nil, true
}
if len(stack.workers) == 0 {
return nil, false
}
// move worker
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
stack.actualNumOfWorkers--
return w, false
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
for i := 0; i < len(stack.workers); i++ {
// worker in the stack, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
// worker found and removed
return true
}
}
// no worker with such ID
return false
}
// Workers return copy of the workers in the stack
func (stack *Stack) Workers() []worker.SyncWorker {
stack.mutex.Lock()
defer stack.mutex.Unlock()
workersCopy := make([]worker.SyncWorker, 0, 1)
// copy
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
}
}
return workersCopy
}
func (stack *Stack) isDestroying() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
stack.destroy = true
stack.mutex.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
stack.mutex.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
stack.mutex.Unlock()
continue
}
stack.mutex.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
stack.workers[i].State().Set(states.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
stack.mutex.Unlock()
// clear
stack.Reset()
return
}
}
}
|