1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
package channel
import (
"context"
"sync"
"sync/atomic"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/worker"
)
type Vec struct {
sync.RWMutex
// destroy signal
destroy uint64
// channel with the workers
workers chan worker.BaseProcess
}
func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, len),
}
return vec
}
// Push is O(1) operation
// In case of TTL and full channel O(n) worst case, where n is len of the channel
func (v *Vec) Push(w worker.BaseProcess) {
// Non-blocking channel send
select {
case v.workers <- w:
// default select branch is only possible when dealing with TTL
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
// Stop Pop operations
v.Lock()
defer v.Unlock()
/*
we can be in the default branch by the following reasons:
1. TTL is set with no requests during the TTL
2. Violated Get <-> Release operation (how ??)
*/
for i := 0; i < len(v.workers); i++ {
/*
We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
*/
wrk := <-v.workers
switch wrk.State().Value() {
// skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
/*
Bad states are here.
*/
default:
// kill the current worker (just to be sure it's dead)
if wrk != nil {
_ = wrk.Kill()
}
// replace with the new one and return from the loop
// new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
// But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
}
}
}
func (v *Vec) Remove(_ int64) {}
func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
return true
}
*/
if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}
// used only for the TTL-ed workers
v.RLock()
defer v.RUnlock()
select {
case w := <-v.workers:
return w, nil
case <-ctx.Done():
return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
}
}
func (v *Vec) Destroy() {
atomic.StoreUint64(&v.destroy, 1)
}
|