summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/priority_queue/binary_heap.go71
-rw-r--r--pkg/priority_queue/binary_heap_test.go21
-rw-r--r--pkg/priority_queue/interface.go4
-rw-r--r--pkg/priority_queue/pq.go26
-rw-r--r--pkg/priority_queue/pq_test.go65
5 files changed, 167 insertions, 20 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index 7e034e82..00c73869 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -14,38 +14,73 @@ func (bh *BinHeap) Init(items []PQItem) {
arraySize := len(items) - 1
for i := arraySize/2 - 1; i >= 0; i-- {
- bh.shiftDown(items, i, arraySize)
+ bh.fixDown(i, arraySize)
}
for i := arraySize - 1; i >= 1; i-- {
items[0], items[i] = items[i], items[0]
- bh.shiftDown(items, 0, i-1)
+ bh.fixDown(0, i-1)
}
}
-func (bh *BinHeap) shiftDown(numbers []PQItem, k, n int) {
- // k << 1 is equal to k*2
- for k<<1 <= n {
- j := k << 1
+func (bh *BinHeap) fixUp() {
+ k := len(*bh) - 1
+ p := (k - 1) >> 1 // k-1 / 2
- if j < n && numbers[j].Priority() < numbers[j+1].Priority() {
- j++
+ for k > 0 {
+ cur, par := (*bh)[k], (*bh)[p]
+
+ if cur.Priority() < par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
}
+ }
+}
- if !(numbers[k].Priority() < numbers[j].Priority()) {
- break
+func (bh *BinHeap) swap(i, j int) {
+ (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := curr*2 + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if curr*2+2 <= end {
+ cTwoIdx = curr*2 + 2
}
- numbers[k], numbers[j] = numbers[j], numbers[k]
- k = j
+ idxToSwap := cOneIdx
+ if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() {
+ bh.swap(curr, idxToSwap)
+ curr = idxToSwap
+ cOneIdx = curr*2 + 1
+ } else {
+ return
+ }
}
}
-func (bh *BinHeap) fix() {}
-func (bh *BinHeap) Push(_ PQItem) {}
+func (bh *BinHeap) Insert(item PQItem) {
+ *bh = append(*bh, item)
+ bh.fixUp()
+}
+
+func (bh *BinHeap) GetMax() PQItem {
+ l := len(*bh)
+ if l == 0 {
+ return nil
+ }
+
+ bh.swap(0, l-1)
-func (bh *BinHeap) Pop() PQItem {
- bh.fix()
- // get min
- return nil
+ item := (*bh)[l-1]
+ *bh = (*bh)[0 : l-1]
+ bh.fixDown(0, l-2)
+ return item
}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index a2b0e7e8..1b510be3 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -29,6 +29,27 @@ func TestBinHeap_Init(t *testing.T) {
require.Equal(t, expected, a)
}
+func TestBinHeap_Init2(t *testing.T) {
+ a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap()
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]PQItem, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.GetMax()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
func BenchmarkBinHeap_Init(b *testing.B) {
a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
bh := NewBinHeap()
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 45430486..3cc1d575 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -1,8 +1,8 @@
package priorityqueue
type Queue interface {
- Push(item PQItem)
- Pop() PQItem
+ Insert(item PQItem)
+ GetMax() PQItem
}
type PQItem interface {
diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go
new file mode 100644
index 00000000..1b33cb92
--- /dev/null
+++ b/pkg/priority_queue/pq.go
@@ -0,0 +1,26 @@
+package priorityqueue
+
+import "sync"
+
+type PQ struct {
+ sync.RWMutex
+ bh *BinHeap
+}
+
+func NewPriorityQueue() *PQ {
+ return &PQ{
+ bh: NewBinHeap(),
+ }
+}
+
+func (p *PQ) Insert(item PQItem) {
+ p.Lock()
+ p.bh.Insert(item)
+ p.Unlock()
+}
+
+func (p *PQ) Get() PQItem {
+ p.Lock()
+ defer p.Unlock()
+ return p.bh.GetMax()
+}
diff --git a/pkg/priority_queue/pq_test.go b/pkg/priority_queue/pq_test.go
new file mode 100644
index 00000000..cdec10f5
--- /dev/null
+++ b/pkg/priority_queue/pq_test.go
@@ -0,0 +1,65 @@
+package priorityqueue
+
+import (
+ "fmt"
+ "math/rand"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestNewPriorityQueue(t *testing.T) {
+ insertsPerSec := uint64(0)
+ getPerSec := uint64(0)
+ stopCh := make(chan struct{}, 1)
+ pq := NewPriorityQueue()
+
+ go func() {
+ tt := time.NewTicker(time.Second)
+
+ for {
+ select {
+ case <-tt.C:
+ fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec)))
+ fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
+ atomic.StoreUint64(&getPerSec, 0)
+ atomic.StoreUint64(&insertsPerSec, 0)
+ case <-stopCh:
+ tt.Stop()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ it := pq.Get()
+ if it == nil {
+ continue
+ }
+ atomic.AddUint64(&getPerSec, 1)
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.Insert(Test(rand.Int())) //nolint:gosec
+ atomic.AddUint64(&insertsPerSec, 1)
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+}