summaryrefslogtreecommitdiff
path: root/pkg/priorityqueue/binary_heap.go
blob: 82424331deac7bfad4ae972a19fd2d7dc8790df6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
binary heap (min-heap) algorithm used as a core for the priority queue
*/

package priorityqueue

import (
	"sync"
	"sync/atomic"
)

type BinHeap struct {
	items []Item
	// find a way to use pointer to the raw data
	len    uint64
	maxLen uint64
	cond   sync.Cond
}

func NewBinHeap(maxLen uint64) *BinHeap {
	if maxLen == 0 {
		maxLen = 100_000
	}

	return &BinHeap{
		items:  make([]Item, 0, maxLen),
		len:    0,
		maxLen: maxLen,
		cond:   sync.Cond{L: &sync.Mutex{}},
	}
}

func (bh *BinHeap) fixUp() {
	k := bh.len - 1
	p := (k - 1) >> 1 // k-1 / 2

	for k > 0 {
		cur, par := (bh.items)[k], (bh.items)[p]

		if cur.Priority() < par.Priority() {
			bh.swap(k, p)
			k = p
			p = (k - 1) >> 1
		} else {
			return
		}
	}
}

func (bh *BinHeap) swap(i, j uint64) {
	(bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
}

func (bh *BinHeap) fixDown(curr, end int) {
	cOneIdx := (curr << 1) + 1
	for cOneIdx <= end {
		cTwoIdx := -1
		if (curr<<1)+2 <= end {
			cTwoIdx = (curr << 1) + 2
		}

		idxToSwap := cOneIdx
		// oh my, so unsafe
		if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
			idxToSwap = cTwoIdx
		}
		if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
			bh.swap(uint64(curr), uint64(idxToSwap))
			curr = idxToSwap
			cOneIdx = (curr << 1) + 1
		} else {
			return
		}
	}
}

func (bh *BinHeap) Len() uint64 {
	return atomic.LoadUint64(&bh.len)
}

func (bh *BinHeap) Insert(item Item) {
	bh.cond.L.Lock()

	// check the binary heap len before insertion
	if bh.Len() > bh.maxLen {
		// unlock the mutex to proceed to get-max
		bh.cond.L.Unlock()

		// signal waiting goroutines
		for bh.Len() > 0 {
			// signal waiting goroutines
			bh.cond.Signal()
		}
		// lock mutex to proceed inserting into the empty slice
		bh.cond.L.Lock()
	}

	bh.items = append(bh.items, item)

	// add len to the slice
	atomic.AddUint64(&bh.len, 1)

	// fix binary heap up
	bh.fixUp()
	bh.cond.L.Unlock()

	// signal the goroutine on wait
	bh.cond.Signal()
}

func (bh *BinHeap) GetMax() Item {
	bh.cond.L.Lock()

	// if len == 0, wait for the signal
	for bh.Len() == 0 {
		bh.cond.Wait()
	}

	bh.swap(0, bh.len-1)

	item := (bh.items)[int(bh.len)-1]
	bh.items = (bh).items[0 : int(bh.len)-1]
	bh.fixDown(0, int(bh.len-2))

	// reduce len
	atomic.AddUint64(&bh.len, ^uint64(0))

	bh.cond.L.Unlock()
	return item
}