summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/priorityqueue/binary_heap.go39
-rw-r--r--pkg/priorityqueue/binary_heap_test.go4
2 files changed, 33 insertions, 10 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index a8d80fc0..82424331 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -12,15 +12,21 @@ import (
type BinHeap struct {
items []Item
// find a way to use pointer to the raw data
- len uint64
- cond sync.Cond
+ len uint64
+ maxLen uint64
+ cond sync.Cond
}
-func NewBinHeap() *BinHeap {
+func NewBinHeap(maxLen uint64) *BinHeap {
+ if maxLen == 0 {
+ maxLen = 100_000
+ }
+
return &BinHeap{
- items: make([]Item, 0, 100),
- len: 0,
- cond: sync.Cond{L: &sync.Mutex{}},
+ items: make([]Item, 0, maxLen),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
}
}
@@ -74,6 +80,21 @@ func (bh *BinHeap) Len() uint64 {
func (bh *BinHeap) Insert(item Item) {
bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
bh.items = append(bh.items, item)
// add len to the slice
@@ -89,9 +110,9 @@ func (bh *BinHeap) Insert(item Item) {
func (bh *BinHeap) GetMax() Item {
bh.cond.L.Lock()
- defer bh.cond.L.Unlock()
- for atomic.LoadUint64(&bh.len) == 0 {
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
bh.cond.Wait()
}
@@ -103,5 +124,7 @@ func (bh *BinHeap) GetMax() Item {
// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
return item
}
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index b02017b6..dadc3a39 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -37,7 +37,7 @@ func (t Test) Priority() uint64 {
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
- bh := NewBinHeap()
+ bh := NewBinHeap(100)
for i := 0; i < len(a); i++ {
bh.Insert(a[i])
@@ -59,7 +59,7 @@ func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewBinHeap()
+ pq := NewBinHeap(100)
go func() {
tt := time.NewTicker(time.Second)