1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
/*
binary heap (min-heap) algorithm used as a core for the priority queue
*/
package priorityqueue
import (
"sync"
"sync/atomic"
)
type BinHeap struct {
items []Item
// find a way to use pointer to the raw data
len uint64
maxLen uint64
cond sync.Cond
}
func NewBinHeap(maxLen uint64) *BinHeap {
if maxLen == 0 {
maxLen = 100_000
}
return &BinHeap{
items: make([]Item, 0, maxLen),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
}
}
func (bh *BinHeap) fixUp() {
k := bh.len - 1
p := (k - 1) >> 1 // k-1 / 2
for k > 0 {
cur, par := (bh.items)[k], (bh.items)[p]
if cur.Priority() < par.Priority() {
bh.swap(k, p)
k = p
p = (k - 1) >> 1
} else {
return
}
}
}
func (bh *BinHeap) swap(i, j uint64) {
(bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
}
func (bh *BinHeap) fixDown(curr, end int) {
cOneIdx := (curr << 1) + 1
for cOneIdx <= end {
cTwoIdx := -1
if (curr<<1)+2 <= end {
cTwoIdx = (curr << 1) + 2
}
idxToSwap := cOneIdx
// oh my, so unsafe
if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
idxToSwap = cTwoIdx
}
if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
bh.swap(uint64(curr), uint64(idxToSwap))
curr = idxToSwap
cOneIdx = (curr << 1) + 1
} else {
return
}
}
}
func (bh *BinHeap) Len() uint64 {
return atomic.LoadUint64(&bh.len)
}
func (bh *BinHeap) Insert(item Item) {
bh.cond.L.Lock()
// check the binary heap len before insertion
if bh.Len() > bh.maxLen {
// unlock the mutex to proceed to get-max
bh.cond.L.Unlock()
// signal waiting goroutines
for bh.Len() > 0 {
// signal waiting goroutines
bh.cond.Signal()
}
// lock mutex to proceed inserting into the empty slice
bh.cond.L.Lock()
}
bh.items = append(bh.items, item)
// add len to the slice
atomic.AddUint64(&bh.len, 1)
// fix binary heap up
bh.fixUp()
bh.cond.L.Unlock()
// signal the goroutine on wait
bh.cond.Signal()
}
func (bh *BinHeap) GetMax() Item {
bh.cond.L.Lock()
// if len == 0, wait for the signal
for bh.Len() == 0 {
bh.cond.Wait()
}
bh.swap(0, bh.len-1)
item := (bh.items)[int(bh.len)-1]
bh.items = (bh).items[0 : int(bh.len)-1]
bh.fixDown(0, int(bh.len-2))
// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
bh.cond.L.Unlock()
return item
}
|