diff options
Diffstat (limited to 'priority_queue')
-rw-r--r-- | priority_queue/binary_heap.go | 125 | ||||
-rw-r--r-- | priority_queue/binary_heap_test.go | 140 | ||||
-rw-r--r-- | priority_queue/interface.go | 22 |
3 files changed, 0 insertions, 287 deletions
diff --git a/priority_queue/binary_heap.go b/priority_queue/binary_heap.go deleted file mode 100644 index fc043927..00000000 --- a/priority_queue/binary_heap.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -binary heap (min-heap) algorithm used as a core for the priority queue -*/ - -package priorityqueue - -import ( - "sync" - "sync/atomic" -) - -type BinHeap struct { - items []Item - // find a way to use pointer to the raw data - len uint64 - maxLen uint64 - cond sync.Cond -} - -func NewBinHeap(maxLen uint64) *BinHeap { - return &BinHeap{ - items: make([]Item, 0, 1000), - len: 0, - maxLen: maxLen, - cond: sync.Cond{L: &sync.Mutex{}}, - } -} - -func (bh *BinHeap) fixUp() { - k := bh.len - 1 - p := (k - 1) >> 1 // k-1 / 2 - - for k > 0 { - cur, par := (bh.items)[k], (bh.items)[p] - - if cur.Priority() < par.Priority() { - bh.swap(k, p) - k = p - p = (k - 1) >> 1 - } else { - return - } - } -} - -func (bh *BinHeap) swap(i, j uint64) { - (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] -} - -func (bh *BinHeap) fixDown(curr, end int) { - cOneIdx := (curr << 1) + 1 - for cOneIdx <= end { - cTwoIdx := -1 - if (curr<<1)+2 <= end { - cTwoIdx = (curr << 1) + 2 - } - - idxToSwap := cOneIdx - if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { - idxToSwap = cTwoIdx - } - if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { - bh.swap(uint64(curr), uint64(idxToSwap)) - curr = idxToSwap - cOneIdx = (curr << 1) + 1 - } else { - return - } - } -} - -func (bh *BinHeap) Len() uint64 { - return atomic.LoadUint64(&bh.len) -} - -func (bh *BinHeap) Insert(item Item) { - bh.cond.L.Lock() - - // check the binary heap len before insertion - if bh.Len() > bh.maxLen { - // unlock the mutex to proceed to get-max - bh.cond.L.Unlock() - - // signal waiting goroutines - for bh.Len() > 0 { - // signal waiting goroutines - bh.cond.Signal() - } - // lock mutex to proceed inserting into the empty slice - bh.cond.L.Lock() - } - - bh.items = append(bh.items, item) - - // add len to the slice - atomic.AddUint64(&bh.len, 1) - - // fix binary heap up - bh.fixUp() - bh.cond.L.Unlock() - - // signal the goroutine on wait - bh.cond.Signal() -} - -func (bh *BinHeap) ExtractMin() Item { - bh.cond.L.Lock() - - // if len == 0, wait for the signal - for bh.Len() == 0 { - bh.cond.Wait() - } - - bh.swap(0, bh.len-1) - - item := (bh.items)[int(bh.len)-1] - bh.items = (bh).items[0 : int(bh.len)-1] - bh.fixDown(0, int(bh.len-2)) - - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) - - bh.cond.L.Unlock() - return item -} diff --git a/priority_queue/binary_heap_test.go b/priority_queue/binary_heap_test.go deleted file mode 100644 index e29835c2..00000000 --- a/priority_queue/binary_heap_test.go +++ /dev/null @@ -1,140 +0,0 @@ -package priorityqueue - -import ( - "fmt" - "math/rand" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -type Test int - -func (t Test) Body() []byte { - return nil -} - -func (t Test) Context() ([]byte, error) { - return nil, nil -} - -func (t Test) ID() string { - return "none" -} - -func (t Test) Priority() int64 { - return int64(t) -} - -func TestBinHeap_Init(t *testing.T) { - a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap(12) - - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - - res := make([]Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.ExtractMin() - res = append(res, item) - } - - require.Equal(t, expected, res) -} - -func TestBinHeap_MaxLen(t *testing.T) { - a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap(1) - - go func() { - res := make([]Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.ExtractMin() - res = append(res, item) - } - require.Equal(t, 11, len(res)) - return - }() - - time.Sleep(time.Second) - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - time.Sleep(time.Second) -} - -func TestNewPriorityQueue(t *testing.T) { - insertsPerSec := uint64(0) - getPerSec := uint64(0) - stopCh := make(chan struct{}, 1) - pq := NewBinHeap(1000) - - go func() { - tt3 := time.NewTicker(time.Millisecond * 10) - for { - select { - case <-tt3.C: - require.Less(t, pq.Len(), uint64(1002)) - case <-stopCh: - return - } - } - }() - - go func() { - tt := time.NewTicker(time.Second) - - for { - select { - case <-tt.C: - fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) - atomic.StoreUint64(&insertsPerSec, 0) - fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec))) - atomic.StoreUint64(&getPerSec, 0) - case <-stopCh: - tt.Stop() - return - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - pq.ExtractMin() - atomic.AddUint64(&getPerSec, 1) - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - pq.Insert(Test(rand.Int())) //nolint:gosec - atomic.AddUint64(&insertsPerSec, 1) - } - } - }() - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} -} diff --git a/priority_queue/interface.go b/priority_queue/interface.go deleted file mode 100644 index 42510f96..00000000 --- a/priority_queue/interface.go +++ /dev/null @@ -1,22 +0,0 @@ -package priorityqueue - -type Queue interface { - Insert(item Item) - ExtractMin() Item - Len() uint64 -} - -// Item represents binary heap item -type Item interface { - // ID is a unique item identifier - ID() string - - // Priority returns the Item's priority to sort - Priority() int64 - - // Body is the Item payload - Body() []byte - - // Context is the Item meta information - Context() ([]byte, error) -} |