summaryrefslogtreecommitdiff
path: root/pkg/priority_queue
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 10:11:22 +0300
committerValery Piashchynski <[email protected]>2021-07-11 10:11:22 +0300
commit589f759cc2411319adbca2ece0dbe212407d1eba (patch)
tree2fbea4a6033d65813c41fd80f6339a524b46c9b2 /pkg/priority_queue
parentcb2665d93ad7abe1ab30508ff0e2bd4d0bc379ea (diff)
Update informer interface to return slice of pointers (do not over-copy
the Stat structure). Make amqp Push concurrent safe. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/priority_queue')
-rw-r--r--pkg/priority_queue/binary_heap.go126
-rw-r--r--pkg/priority_queue/binary_heap_test.go124
-rw-r--r--pkg/priority_queue/interface.go28
3 files changed, 278 insertions, 0 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
new file mode 100644
index 00000000..e47dd2c8
--- /dev/null
+++ b/pkg/priority_queue/binary_heap.go
@@ -0,0 +1,126 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type BinHeap struct {
+ items []Item
+ // find a way to use pointer to the raw data
+ len uint64
+ maxLen uint64
+ cond sync.Cond
+}
+
+func NewBinHeap(maxLen uint64) *BinHeap {
+ return &BinHeap{
+ items: make([]Item, 0, 1000),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (bh *BinHeap) fixUp() {
+ k := bh.len - 1
+ p := (k - 1) >> 1 // k-1 / 2
+
+ for k > 0 {
+ cur, par := (bh.items)[k], (bh.items)[p]
+
+ if cur.Priority() < par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) swap(i, j uint64) {
+ (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := (curr << 1) + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if (curr<<1)+2 <= end {
+ cTwoIdx = (curr << 1) + 2
+ }
+
+ idxToSwap := cOneIdx
+ // oh my, so unsafe
+ if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
+ bh.swap(uint64(curr), uint64(idxToSwap))
+ curr = idxToSwap
+ cOneIdx = (curr << 1) + 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) Len() uint64 {
+ return atomic.LoadUint64(&bh.len)
+}
+
+func (bh *BinHeap) Insert(item Item) {
+ bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
+ bh.items = append(bh.items, item)
+
+ // add len to the slice
+ atomic.AddUint64(&bh.len, 1)
+
+ // fix binary heap up
+ bh.fixUp()
+ bh.cond.L.Unlock()
+
+ // signal the goroutine on wait
+ bh.cond.Signal()
+}
+
+func (bh *BinHeap) GetMax() Item {
+ bh.cond.L.Lock()
+
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
+ bh.cond.Wait()
+ }
+
+ bh.swap(0, bh.len-1)
+
+ item := (bh.items)[int(bh.len)-1]
+ bh.items = (bh).items[0 : int(bh.len)-1]
+ bh.fixDown(0, int(bh.len-2))
+
+ // reduce len
+ atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
+ return item
+}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
new file mode 100644
index 00000000..06d0735c
--- /dev/null
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -0,0 +1,124 @@
+package priorityqueue
+
+import (
+ "fmt"
+ "math/rand"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+type Test int
+
+func (t Test) Ack() error {
+ return nil
+}
+
+func (t Test) Nack() error {
+ return nil
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (t Test) ID() string {
+ return "none"
+}
+
+func (t Test) Priority() uint64 {
+ return uint64(t)
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap(12)
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]Item, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.GetMax()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
+func TestNewPriorityQueue(t *testing.T) {
+ insertsPerSec := uint64(0)
+ getPerSec := uint64(0)
+ stopCh := make(chan struct{}, 1)
+ pq := NewBinHeap(1000)
+
+ go func() {
+ tt3 := time.NewTicker(time.Millisecond * 10)
+ for {
+ select {
+ case <-tt3.C:
+ require.Less(t, pq.Len(), uint64(1002))
+ case <-stopCh:
+ return
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Second)
+
+ for {
+ select {
+ case <-tt.C:
+ fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
+ atomic.StoreUint64(&insertsPerSec, 0)
+ fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec)))
+ atomic.StoreUint64(&getPerSec, 0)
+ case <-stopCh:
+ tt.Stop()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.GetMax()
+ atomic.AddUint64(&getPerSec, 1)
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.Insert(Test(rand.Int())) //nolint:gosec
+ atomic.AddUint64(&insertsPerSec, 1)
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
new file mode 100644
index 00000000..8278dc8d
--- /dev/null
+++ b/pkg/priority_queue/interface.go
@@ -0,0 +1,28 @@
+package priorityqueue
+
+type Queue interface {
+ Insert(item Item)
+ GetMax() Item
+ Len() uint64
+}
+
+// Item represents binary heap item
+type Item interface {
+ // ID is a unique item identifier
+ ID() string
+
+ // Priority returns the Item's priority to sort
+ Priority() uint64
+
+ // Body is the Item payload
+ Body() []byte
+
+ // Context is the Item meta information
+ Context() ([]byte, error)
+
+ // Ack - acknowledge the Item after processing
+ Ack() error
+
+ // Nack - discard the Item
+ Nack() error
+}