summaryrefslogtreecommitdiff
path: root/worker_watcher/container/channel/vec.go
blob: 65c2066ec6d78b33b084f9b3618db2c92651350e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package channel

import (
	"context"
	"sync"
	"sync/atomic"

	"github.com/spiral/errors"
	"github.com/spiral/roadrunner/v2/worker"
)

type Vec struct {
	sync.RWMutex
	// destroy signal
	destroy uint64
	// channel with the workers
	workers chan worker.BaseProcess
}

func NewVector(len uint64) *Vec {
	vec := &Vec{
		destroy: 0,
		workers: make(chan worker.BaseProcess, len),
	}

	return vec
}

// Push is O(1) operation
// In case of TTL and full channel O(n) worst case, where n is len of the channel
func (v *Vec) Push(w worker.BaseProcess) {
	select {
	case v.workers <- w:
		// default select branch is only possible when dealing with TTL
		// because in that case, workers in the v.workers channel can be TTL-ed and killed
		// but presenting in the channel
	default:
		// Stop Pop operations
		v.Lock()
		defer v.Unlock()

		/*
			we can be in the default branch by the following reasons:
			1. TTL is set with no requests during the TTL
			2. Violated Get <-> Release operation (how ??)
		*/

		for i := 0; i < len(v.workers); i++ {
			/*
				We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
				BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers
				so, down by the code, we might have a problem when pushing the new worker to the v.workers
			*/
			wrk := <-v.workers

			switch wrk.State().Value() {
			// good states
			case worker.StateWorking, worker.StateReady:
				// put the worker back
				// generally, while send and receive operations are concurrent (from the channel), channel behave
				// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
				select {
				case v.workers <- wrk:
					continue
				default:
					// kill the worker from the channel
					wrk.State().Set(worker.StateInvalid)
					_ = wrk.Kill()

					continue
				}
				/*
					Bad states are here.
				*/
			default:
				// kill the current worker (just to be sure it's dead)
				if wrk != nil {
					_ = wrk.Kill()
				}
				// replace with the new one and return from the loop
				// new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
				// But this case will be handled in the worker_watcher::Get
				select {
				case v.workers <- w:
					return
					// the place for the new worker was occupied before
				default:
					// kill the new worker and reallocate it
					w.State().Set(worker.StateInvalid)
					_ = w.Kill()
					return
				}
			}
		}
	}
}

func (v *Vec) Remove(_ int64) {}

func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
	if atomic.LoadUint64(&v.destroy) == 1 {
		// drain channel
		for {
			select {
			case <-v.workers:
				continue
			default:
				return nil, errors.E(errors.WatcherStopped)
			}
		}
	}

	// used only for the TTL-ed workers
	v.RLock()
	defer v.RUnlock()

	select {
	case w := <-v.workers:
		return w, nil
	case <-ctx.Done():
		return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
	}
}

func (v *Vec) Destroy() {
	atomic.StoreUint64(&v.destroy, 1)
}