summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/stack.go
blob: 55034e4113e5825c7ec4f23ac67f8c1cfa53cb0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package worker_watcher //nolint:golint,stylecheck
import (
	"context"
	"runtime"
	"sync"
	"time"

	"github.com/spiral/roadrunner/v2/pkg/states"
	"github.com/spiral/roadrunner/v2/pkg/worker"
)

type Stack struct {
	workers             []*worker.SyncWorkerImpl
	mutex               sync.RWMutex
	destroy             bool
	actualNumOfWorkers  uint64
	initialNumOfWorkers uint64
}

func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
	w := runtime.NumCPU()
	return &Stack{
		workers:             make([]*worker.SyncWorkerImpl, 0, w),
		actualNumOfWorkers:  0,
		initialNumOfWorkers: initialNumOfWorkers,
	}
}

func (stack *Stack) Reset() {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	stack.actualNumOfWorkers = 0
	stack.workers = nil
}

// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	stack.actualNumOfWorkers++
	stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
}

func (stack *Stack) IsEmpty() bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	return len(stack.workers) == 0
}

func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()

	// do not release new stack
	if stack.destroy {
		return nil, true
	}

	if len(stack.workers) == 0 {
		return nil, false
	}

	// move worker
	w := stack.workers[len(stack.workers)-1]
	stack.workers = stack.workers[:len(stack.workers)-1]
	stack.actualNumOfWorkers--
	return w, false
}

func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	for i := 0; i < len(stack.workers); i++ {
		// worker in the stack, reallocating
		if stack.workers[i].Pid() == pid {
			stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
			stack.actualNumOfWorkers--
			// worker found and removed
			return true
		}
	}
	// no worker with such ID
	return false
}

// Workers return copy of the workers in the stack
func (stack *Stack) Workers() []worker.SyncWorker {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	workersCopy := make([]worker.SyncWorker, 0, 1)
	// copy
	for _, v := range stack.workers {
		if v != nil {
			workersCopy = append(workersCopy, v)
		}
	}

	return workersCopy
}

func (stack *Stack) isDestroying() bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	return stack.destroy
}

// we also have to give a chance to pool to Push worker (return it)
func (stack *Stack) Destroy(ctx context.Context) {
	stack.mutex.Lock()
	stack.destroy = true
	stack.mutex.Unlock()

	tt := time.NewTicker(time.Millisecond * 500)
	defer tt.Stop()
	for {
		select {
		case <-tt.C:
			stack.mutex.Lock()
			// that might be one of the workers is working
			if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
				stack.mutex.Unlock()
				continue
			}
			stack.mutex.Unlock()
			// unnecessary mutex, but
			// just to make sure. All stack at this moment are in the stack
			// Pop operation is blocked, push can't be done, since it's not possible to pop
			stack.mutex.Lock()
			for i := 0; i < len(stack.workers); i++ {
				// set state for the stack in the stack (unused at the moment)
				stack.workers[i].State().Set(states.StateDestroyed)
				// kill the worker
				_ = stack.workers[i].Kill()
			}
			stack.mutex.Unlock()
			// clear
			stack.Reset()
			return
		}
	}
}