summaryrefslogtreecommitdiff
path: root/priority_queue
diff options
context:
space:
mode:
Diffstat (limited to 'priority_queue')
-rw-r--r--priority_queue/binary_heap.go125
-rw-r--r--priority_queue/binary_heap_test.go140
-rw-r--r--priority_queue/interface.go22
3 files changed, 0 insertions, 287 deletions
diff --git a/priority_queue/binary_heap.go b/priority_queue/binary_heap.go
deleted file mode 100644
index fc043927..00000000
--- a/priority_queue/binary_heap.go
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-binary heap (min-heap) algorithm used as a core for the priority queue
-*/
-
-package priorityqueue
-
-import (
- "sync"
- "sync/atomic"
-)
-
-type BinHeap struct {
- items []Item
- // find a way to use pointer to the raw data
- len uint64
- maxLen uint64
- cond sync.Cond
-}
-
-func NewBinHeap(maxLen uint64) *BinHeap {
- return &BinHeap{
- items: make([]Item, 0, 1000),
- len: 0,
- maxLen: maxLen,
- cond: sync.Cond{L: &sync.Mutex{}},
- }
-}
-
-func (bh *BinHeap) fixUp() {
- k := bh.len - 1
- p := (k - 1) >> 1 // k-1 / 2
-
- for k > 0 {
- cur, par := (bh.items)[k], (bh.items)[p]
-
- if cur.Priority() < par.Priority() {
- bh.swap(k, p)
- k = p
- p = (k - 1) >> 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) swap(i, j uint64) {
- (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
-}
-
-func (bh *BinHeap) fixDown(curr, end int) {
- cOneIdx := (curr << 1) + 1
- for cOneIdx <= end {
- cTwoIdx := -1
- if (curr<<1)+2 <= end {
- cTwoIdx = (curr << 1) + 2
- }
-
- idxToSwap := cOneIdx
- if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
- idxToSwap = cTwoIdx
- }
- if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
- bh.swap(uint64(curr), uint64(idxToSwap))
- curr = idxToSwap
- cOneIdx = (curr << 1) + 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) Len() uint64 {
- return atomic.LoadUint64(&bh.len)
-}
-
-func (bh *BinHeap) Insert(item Item) {
- bh.cond.L.Lock()
-
- // check the binary heap len before insertion
- if bh.Len() > bh.maxLen {
- // unlock the mutex to proceed to get-max
- bh.cond.L.Unlock()
-
- // signal waiting goroutines
- for bh.Len() > 0 {
- // signal waiting goroutines
- bh.cond.Signal()
- }
- // lock mutex to proceed inserting into the empty slice
- bh.cond.L.Lock()
- }
-
- bh.items = append(bh.items, item)
-
- // add len to the slice
- atomic.AddUint64(&bh.len, 1)
-
- // fix binary heap up
- bh.fixUp()
- bh.cond.L.Unlock()
-
- // signal the goroutine on wait
- bh.cond.Signal()
-}
-
-func (bh *BinHeap) ExtractMin() Item {
- bh.cond.L.Lock()
-
- // if len == 0, wait for the signal
- for bh.Len() == 0 {
- bh.cond.Wait()
- }
-
- bh.swap(0, bh.len-1)
-
- item := (bh.items)[int(bh.len)-1]
- bh.items = (bh).items[0 : int(bh.len)-1]
- bh.fixDown(0, int(bh.len-2))
-
- // reduce len
- atomic.AddUint64(&bh.len, ^uint64(0))
-
- bh.cond.L.Unlock()
- return item
-}
diff --git a/priority_queue/binary_heap_test.go b/priority_queue/binary_heap_test.go
deleted file mode 100644
index e29835c2..00000000
--- a/priority_queue/binary_heap_test.go
+++ /dev/null
@@ -1,140 +0,0 @@
-package priorityqueue
-
-import (
- "fmt"
- "math/rand"
- "sync/atomic"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
-)
-
-type Test int
-
-func (t Test) Body() []byte {
- return nil
-}
-
-func (t Test) Context() ([]byte, error) {
- return nil, nil
-}
-
-func (t Test) ID() string {
- return "none"
-}
-
-func (t Test) Priority() int64 {
- return int64(t)
-}
-
-func TestBinHeap_Init(t *testing.T) {
- a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap(12)
-
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
-
- res := make([]Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.ExtractMin()
- res = append(res, item)
- }
-
- require.Equal(t, expected, res)
-}
-
-func TestBinHeap_MaxLen(t *testing.T) {
- a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap(1)
-
- go func() {
- res := make([]Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.ExtractMin()
- res = append(res, item)
- }
- require.Equal(t, 11, len(res))
- return
- }()
-
- time.Sleep(time.Second)
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- time.Sleep(time.Second)
-}
-
-func TestNewPriorityQueue(t *testing.T) {
- insertsPerSec := uint64(0)
- getPerSec := uint64(0)
- stopCh := make(chan struct{}, 1)
- pq := NewBinHeap(1000)
-
- go func() {
- tt3 := time.NewTicker(time.Millisecond * 10)
- for {
- select {
- case <-tt3.C:
- require.Less(t, pq.Len(), uint64(1002))
- case <-stopCh:
- return
- }
- }
- }()
-
- go func() {
- tt := time.NewTicker(time.Second)
-
- for {
- select {
- case <-tt.C:
- fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
- atomic.StoreUint64(&insertsPerSec, 0)
- fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
- atomic.StoreUint64(&getPerSec, 0)
- case <-stopCh:
- tt.Stop()
- return
- }
- }
- }()
-
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- pq.ExtractMin()
- atomic.AddUint64(&getPerSec, 1)
- }
- }
- }()
-
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- pq.Insert(Test(rand.Int())) //nolint:gosec
- atomic.AddUint64(&insertsPerSec, 1)
- }
- }
- }()
-
- time.Sleep(time.Second * 5)
- stopCh <- struct{}{}
- stopCh <- struct{}{}
- stopCh <- struct{}{}
- stopCh <- struct{}{}
-}
diff --git a/priority_queue/interface.go b/priority_queue/interface.go
deleted file mode 100644
index 42510f96..00000000
--- a/priority_queue/interface.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package priorityqueue
-
-type Queue interface {
- Insert(item Item)
- ExtractMin() Item
- Len() uint64
-}
-
-// Item represents binary heap item
-type Item interface {
- // ID is a unique item identifier
- ID() string
-
- // Priority returns the Item's priority to sort
- Priority() int64
-
- // Body is the Item payload
- Body() []byte
-
- // Context is the Item meta information
- Context() ([]byte, error)
-}