1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
package container
import (
"context"
"runtime"
"sync"
"time"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
sync.RWMutex
workers []worker.BaseProcess
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
}
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
stack.Lock()
defer stack.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
// Push worker back to the vec
// If vec in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
stack.Lock()
defer stack.Unlock()
stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
stack.Lock()
defer stack.Unlock()
return len(stack.workers) == 0
}
func (stack *Stack) Pop() (worker.BaseProcess, bool) {
stack.Lock()
defer stack.Unlock()
// do not release new vec
if stack.destroy {
return nil, true
}
if len(stack.workers) == 0 {
return nil, false
}
// move worker
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
stack.actualNumOfWorkers--
return w, false
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
stack.Lock()
defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
// worker in the vec, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
// worker found and removed
return true
}
}
// no worker with such ID
return false
}
// Workers return copy of the workers in the vec
func (stack *Stack) Workers() []worker.BaseProcess {
stack.Lock()
defer stack.Unlock()
workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
// TODO pointers, copy have no sense
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
}
}
return workersCopy
}
func (stack *Stack) isDestroying() bool {
stack.Lock()
defer stack.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
func (stack *Stack) Destroy(_ context.Context) {
stack.Lock()
stack.destroy = true
stack.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
stack.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
stack.Unlock()
continue
}
stack.Unlock()
// unnecessary mutex, but
// just to make sure. All vec at this moment are in the vec
// Pop operation is blocked, push can't be done, since it's not possible to pop
stack.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the vec in the vec (unused at the moment)
stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
stack.Unlock()
// clear
stack.Reset()
return
}
}
}
|