diff options
Diffstat (limited to 'pkg/priority_queue')
-rw-r--r-- | pkg/priority_queue/binary_heap.go | 71 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap_test.go | 21 | ||||
-rw-r--r-- | pkg/priority_queue/interface.go | 4 | ||||
-rw-r--r-- | pkg/priority_queue/pq.go | 26 | ||||
-rw-r--r-- | pkg/priority_queue/pq_test.go | 65 |
5 files changed, 167 insertions, 20 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go index 7e034e82..00c73869 100644 --- a/pkg/priority_queue/binary_heap.go +++ b/pkg/priority_queue/binary_heap.go @@ -14,38 +14,73 @@ func (bh *BinHeap) Init(items []PQItem) { arraySize := len(items) - 1 for i := arraySize/2 - 1; i >= 0; i-- { - bh.shiftDown(items, i, arraySize) + bh.fixDown(i, arraySize) } for i := arraySize - 1; i >= 1; i-- { items[0], items[i] = items[i], items[0] - bh.shiftDown(items, 0, i-1) + bh.fixDown(0, i-1) } } -func (bh *BinHeap) shiftDown(numbers []PQItem, k, n int) { - // k << 1 is equal to k*2 - for k<<1 <= n { - j := k << 1 +func (bh *BinHeap) fixUp() { + k := len(*bh) - 1 + p := (k - 1) >> 1 // k-1 / 2 - if j < n && numbers[j].Priority() < numbers[j+1].Priority() { - j++ + for k > 0 { + cur, par := (*bh)[k], (*bh)[p] + + if cur.Priority() < par.Priority() { + bh.swap(k, p) + k = p + p = (k - 1) >> 1 + } else { + return } + } +} - if !(numbers[k].Priority() < numbers[j].Priority()) { - break +func (bh *BinHeap) swap(i, j int) { + (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i] +} + +func (bh *BinHeap) fixDown(curr, end int) { + cOneIdx := curr*2 + 1 + for cOneIdx <= end { + cTwoIdx := -1 + if curr*2+2 <= end { + cTwoIdx = curr*2 + 2 } - numbers[k], numbers[j] = numbers[j], numbers[k] - k = j + idxToSwap := cOneIdx + if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() { + idxToSwap = cTwoIdx + } + if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() { + bh.swap(curr, idxToSwap) + curr = idxToSwap + cOneIdx = curr*2 + 1 + } else { + return + } } } -func (bh *BinHeap) fix() {} -func (bh *BinHeap) Push(_ PQItem) {} +func (bh *BinHeap) Insert(item PQItem) { + *bh = append(*bh, item) + bh.fixUp() +} + +func (bh *BinHeap) GetMax() PQItem { + l := len(*bh) + if l == 0 { + return nil + } + + bh.swap(0, l-1) -func (bh *BinHeap) Pop() PQItem { - bh.fix() - // get min - return nil + item := (*bh)[l-1] + *bh = (*bh)[0 : l-1] + bh.fixDown(0, l-2) + return item } diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go index a2b0e7e8..1b510be3 100644 --- a/pkg/priority_queue/binary_heap_test.go +++ b/pkg/priority_queue/binary_heap_test.go @@ -29,6 +29,27 @@ func TestBinHeap_Init(t *testing.T) { require.Equal(t, expected, a) } +func TestBinHeap_Init2(t *testing.T) { + a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap() + + for i := 0; i < len(a); i++ { + bh.Insert(a[i]) + } + + expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + res := make([]PQItem, 0, 12) + + for i := 0; i < 11; i++ { + item := bh.GetMax() + res = append(res, item) + } + + require.Equal(t, expected, res) +} + func BenchmarkBinHeap_Init(b *testing.B) { a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} bh := NewBinHeap() diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 45430486..3cc1d575 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,8 +1,8 @@ package priorityqueue type Queue interface { - Push(item PQItem) - Pop() PQItem + Insert(item PQItem) + GetMax() PQItem } type PQItem interface { diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go new file mode 100644 index 00000000..1b33cb92 --- /dev/null +++ b/pkg/priority_queue/pq.go @@ -0,0 +1,26 @@ +package priorityqueue + +import "sync" + +type PQ struct { + sync.RWMutex + bh *BinHeap +} + +func NewPriorityQueue() *PQ { + return &PQ{ + bh: NewBinHeap(), + } +} + +func (p *PQ) Insert(item PQItem) { + p.Lock() + p.bh.Insert(item) + p.Unlock() +} + +func (p *PQ) Get() PQItem { + p.Lock() + defer p.Unlock() + return p.bh.GetMax() +} diff --git a/pkg/priority_queue/pq_test.go b/pkg/priority_queue/pq_test.go new file mode 100644 index 00000000..cdec10f5 --- /dev/null +++ b/pkg/priority_queue/pq_test.go @@ -0,0 +1,65 @@ +package priorityqueue + +import ( + "fmt" + "math/rand" + "sync/atomic" + "testing" + "time" +) + +func TestNewPriorityQueue(t *testing.T) { + insertsPerSec := uint64(0) + getPerSec := uint64(0) + stopCh := make(chan struct{}, 1) + pq := NewPriorityQueue() + + go func() { + tt := time.NewTicker(time.Second) + + for { + select { + case <-tt.C: + fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) + fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) + atomic.StoreUint64(&getPerSec, 0) + atomic.StoreUint64(&insertsPerSec, 0) + case <-stopCh: + tt.Stop() + return + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + it := pq.Get() + if it == nil { + continue + } + atomic.AddUint64(&getPerSec, 1) + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + pq.Insert(Test(rand.Int())) //nolint:gosec + atomic.AddUint64(&insertsPerSec, 1) + } + } + }() + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + stopCh <- struct{}{} + stopCh <- struct{}{} +} |