summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/queue/queue.go
blob: 4f611bbe4b3ae839d62e8f0503426985d1d829f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package queue

import (
	"context"
	"sync"
	"sync/atomic"

	"github.com/spiral/roadrunner/v2/pkg/worker"
)

const (
	initialSize          = 1
	maxInitialSize       = 8
	maxInternalSliceSize = 10
)

type Node struct {
	w []worker.BaseProcess
	// LL
	n *Node
}

type Queue struct {
	mu sync.Mutex

	head *Node
	tail *Node

	curr uint64
	len  uint64

	sliceSize uint64
}

func NewQueue() *Queue {
	q := &Queue{
		mu:        sync.Mutex{},
		head:      nil,
		tail:      nil,
		curr:      0,
		len:       0,
		sliceSize: 0,
	}

	return q
}

func (q *Queue) Push(w worker.BaseProcess) {
	q.mu.Lock()

	if q.head == nil {
		h := newNode(initialSize)
		q.head = h
		q.tail = h
		q.sliceSize = maxInitialSize
	} else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
		n := newNode(maxInternalSliceSize)
		q.tail.n = n
		q.tail = n
		q.sliceSize = maxInternalSliceSize
	}

	q.tail.w = append(q.tail.w, w)

	atomic.AddUint64(&q.len, 1)

	q.mu.Unlock()
}

func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
	q.mu.Lock()

	if q.head == nil {
		return nil, nil
	}

	w := q.head.w[q.curr]
	q.head.w[q.curr] = nil
	atomic.AddUint64(&q.len, ^uint64(0))
	atomic.AddUint64(&q.curr, 1)

	if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
		n := q.head.n
		q.head.n = nil
		q.head = n
		q.curr = 0
	}

	q.mu.Unlock()

	return w, nil
}

func (q *Queue) Remove(_ int64) {}

func (q *Queue) Destroy() {}

func newNode(capacity int) *Node {
	return &Node{w: make([]worker.BaseProcess, 0, capacity)}
}