summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/stack.go
blob: 55f1f52ad951fc4ebfbe49b8288e7b20ba726fe8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package worker_watcher //nolint:golint,stylecheck
import (
	"context"
	"runtime"
	"sync"
	"time"

	"github.com/spiral/roadrunner/v2/interfaces/worker"
	"github.com/spiral/roadrunner/v2/internal"
)

type Stack struct {
	workers            []worker.BaseProcess
	mutex              sync.RWMutex
	destroy            bool
	actualNumOfWorkers int64
}

func NewWorkersStack() *Stack {
	w := runtime.NumCPU()
	return &Stack{
		workers:            make([]worker.BaseProcess, 0, w),
		actualNumOfWorkers: 0,
	}
}

func (stack *Stack) Reset() {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	stack.actualNumOfWorkers = 0
	stack.workers = nil
}

// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	stack.actualNumOfWorkers++
	stack.workers = append(stack.workers, w)
}

func (stack *Stack) IsEmpty() bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	return len(stack.workers) == 0
}

func (stack *Stack) Pop() (worker.BaseProcess, bool) {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()

	// do not release new stack
	if stack.destroy {
		return nil, true
	}

	if len(stack.workers) == 0 {
		return nil, false
	}

	// move worker
	w := stack.workers[len(stack.workers)-1]
	stack.workers = stack.workers[:len(stack.workers)-1]
	stack.actualNumOfWorkers--
	return w, false
}

func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	for i := 0; i < len(stack.workers); i++ {
		// worker in the stack, reallocating
		if stack.workers[i].Pid() == pid {
			stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
			stack.actualNumOfWorkers--
			// worker found and removed
			return true
		}
	}
	// no worker with such ID
	return false
}

// Workers return copy of the workers in the stack
func (stack *Stack) Workers() []worker.BaseProcess {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	workersCopy := make([]worker.BaseProcess, 0, 1)
	// copy
	for _, v := range stack.workers {
		workersCopy = append(workersCopy, v)
	}

	return workersCopy
}

func (stack *Stack) isDestroying() bool {
	stack.mutex.Lock()
	defer stack.mutex.Unlock()
	return stack.destroy
}

// we also have to give a chance to pool to Push worker (return it)
func (stack *Stack) Destroy(ctx context.Context) {
	stack.mutex.Lock()
	stack.destroy = true
	stack.mutex.Unlock()

	tt := time.NewTicker(time.Millisecond * 100)
	for {
		select {
		case <-tt.C:
			stack.mutex.Lock()
			// that might be one of the workers is working
			if len(stack.workers) != int(stack.actualNumOfWorkers) {
				stack.mutex.Unlock()
				continue
			}
			stack.mutex.Unlock()
			// unnecessary mutex, but
			// just to make sure. All stack at this moment are in the stack
			// Pop operation is blocked, push can't be done, since it's not possible to pop
			stack.mutex.Lock()
			for i := 0; i < len(stack.workers); i++ {
				// set state for the stack in the stack (unused at the moment)
				stack.workers[i].State().Set(internal.StateDestroyed)
				// kill the worker
				_ = stack.workers[i].Kill()
			}
			stack.mutex.Unlock()
			tt.Stop()
			// clear
			stack.Reset()
			return
		}
	}
}