summaryrefslogtreecommitdiff
path: root/worker_watcher/container/channel/vec.go
blob: c0c275751df9d5c334113ef5e9844d2e9e9bd48a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package channel

import (
	"context"
	"sync"
	"sync/atomic"

	"github.com/spiral/errors"
	"github.com/spiral/roadrunner/v2/worker"
)

type Vec struct {
	sync.RWMutex
	// destroy signal
	destroy uint64
	// channel with the workers
	workers chan worker.BaseProcess
}

func NewVector(len uint64) *Vec {
	vec := &Vec{
		destroy: 0,
		workers: make(chan worker.BaseProcess, len),
	}

	return vec
}

// Push is O(1) operation
// In case of TTL and full channel O(n) worst case, where n is len of the channel
func (v *Vec) Push(w worker.BaseProcess) {
	select {
	case v.workers <- w:
		// default select branch is only possible when dealing with TTL
		// because in that case, workers in the v.workers channel can be TTL-ed and killed
		// but presenting in the channel
	default:
		// Stop Pop operations
		v.Lock()
		defer v.Unlock()

		/*
			we can be in the default branch by the following reasons:
			1. TTL is set with no requests during the TTL
			2. Violated Get <-> Release operation (how ??)
		*/

		for i := 0; i < len(v.workers); i++ {
			/*
				We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
				BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers
				so, down by the code, we might have a problem when pushing the new worker to the v.workers
			*/
			wrk := <-v.workers

			switch wrk.State().Value() {
			// good states
			case worker.StateWorking, worker.StateReady:
				// put the worker back
				// generally, while send and receive operations are concurrent (from the channel), channel behave
				// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
				select {
				case v.workers <- wrk:
					continue
				default:
					// kill the worker from the channel
					wrk.State().Set(worker.StateInvalid)
					_ = wrk.Kill()

					continue
				}
				/*
					Bad states are here.
				*/
			default:
				// kill the current worker (just to be sure it's dead)
				if wrk != nil {
					_ = wrk.Kill()
				}
				// replace with the new one and return from the loop
				// new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
				// But this case will be handled in the worker_watcher::Get
				select {
				case v.workers <- w:
					return
					// the place for the new worker was occupied before
				default:
					// kill the new worker and reallocate it
					w.State().Set(worker.StateInvalid)
					_ = w.Kill()
					return
				}
			}
		}
	}
}

func (v *Vec) Remove(_ int64) {}

func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
	/*
		if *addr == old {
			*addr = new
			return true
		}
	*/

	if atomic.LoadUint64(&v.destroy) == 1 {
		return nil, errors.E(errors.WatcherStopped)
	}

	// used only for the TTL-ed workers
	v.RLock()
	defer v.RUnlock()

	select {
	case w := <-v.workers:
		return w, nil
	case <-ctx.Done():
		return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
	}
}

func (v *Vec) Destroy() {
	atomic.StoreUint64(&v.destroy, 1)
}