diff options
author | Valery Piashchynski <[email protected]> | 2021-07-11 10:11:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-11 10:11:22 +0300 |
commit | 589f759cc2411319adbca2ece0dbe212407d1eba (patch) | |
tree | 2fbea4a6033d65813c41fd80f6339a524b46c9b2 /pkg/priority_queue | |
parent | cb2665d93ad7abe1ab30508ff0e2bd4d0bc379ea (diff) |
Update informer interface to return slice of pointers (do not over-copy
the Stat structure).
Make amqp Push concurrent safe.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/priority_queue')
-rw-r--r-- | pkg/priority_queue/binary_heap.go | 126 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap_test.go | 124 | ||||
-rw-r--r-- | pkg/priority_queue/interface.go | 28 |
3 files changed, 278 insertions, 0 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go new file mode 100644 index 00000000..e47dd2c8 --- /dev/null +++ b/pkg/priority_queue/binary_heap.go @@ -0,0 +1,126 @@ +/* +binary heap (min-heap) algorithm used as a core for the priority queue +*/ + +package priorityqueue + +import ( + "sync" + "sync/atomic" +) + +type BinHeap struct { + items []Item + // find a way to use pointer to the raw data + len uint64 + maxLen uint64 + cond sync.Cond +} + +func NewBinHeap(maxLen uint64) *BinHeap { + return &BinHeap{ + items: make([]Item, 0, 1000), + len: 0, + maxLen: maxLen, + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (bh *BinHeap) fixUp() { + k := bh.len - 1 + p := (k - 1) >> 1 // k-1 / 2 + + for k > 0 { + cur, par := (bh.items)[k], (bh.items)[p] + + if cur.Priority() < par.Priority() { + bh.swap(k, p) + k = p + p = (k - 1) >> 1 + } else { + return + } + } +} + +func (bh *BinHeap) swap(i, j uint64) { + (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] +} + +func (bh *BinHeap) fixDown(curr, end int) { + cOneIdx := (curr << 1) + 1 + for cOneIdx <= end { + cTwoIdx := -1 + if (curr<<1)+2 <= end { + cTwoIdx = (curr << 1) + 2 + } + + idxToSwap := cOneIdx + // oh my, so unsafe + if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { + idxToSwap = cTwoIdx + } + if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { + bh.swap(uint64(curr), uint64(idxToSwap)) + curr = idxToSwap + cOneIdx = (curr << 1) + 1 + } else { + return + } + } +} + +func (bh *BinHeap) Len() uint64 { + return atomic.LoadUint64(&bh.len) +} + +func (bh *BinHeap) Insert(item Item) { + bh.cond.L.Lock() + + // check the binary heap len before insertion + if bh.Len() > bh.maxLen { + // unlock the mutex to proceed to get-max + bh.cond.L.Unlock() + + // signal waiting goroutines + for bh.Len() > 0 { + // signal waiting goroutines + bh.cond.Signal() + } + // lock mutex to proceed inserting into the empty slice + bh.cond.L.Lock() + } + + bh.items = append(bh.items, item) + + // add len to the slice + atomic.AddUint64(&bh.len, 1) + + // fix binary heap up + bh.fixUp() + bh.cond.L.Unlock() + + // signal the goroutine on wait + bh.cond.Signal() +} + +func (bh *BinHeap) GetMax() Item { + bh.cond.L.Lock() + + // if len == 0, wait for the signal + for bh.Len() == 0 { + bh.cond.Wait() + } + + bh.swap(0, bh.len-1) + + item := (bh.items)[int(bh.len)-1] + bh.items = (bh).items[0 : int(bh.len)-1] + bh.fixDown(0, int(bh.len-2)) + + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) + + bh.cond.L.Unlock() + return item +} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go new file mode 100644 index 00000000..06d0735c --- /dev/null +++ b/pkg/priority_queue/binary_heap_test.go @@ -0,0 +1,124 @@ +package priorityqueue + +import ( + "fmt" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type Test int + +func (t Test) Ack() error { + return nil +} + +func (t Test) Nack() error { + return nil +} + +func (t Test) Body() []byte { + return nil +} + +func (t Test) Context() ([]byte, error) { + return nil, nil +} + +func (t Test) ID() string { + return "none" +} + +func (t Test) Priority() uint64 { + return uint64(t) +} + +func TestBinHeap_Init(t *testing.T) { + a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap(12) + + for i := 0; i < len(a); i++ { + bh.Insert(a[i]) + } + + expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + res := make([]Item, 0, 12) + + for i := 0; i < 11; i++ { + item := bh.GetMax() + res = append(res, item) + } + + require.Equal(t, expected, res) +} + +func TestNewPriorityQueue(t *testing.T) { + insertsPerSec := uint64(0) + getPerSec := uint64(0) + stopCh := make(chan struct{}, 1) + pq := NewBinHeap(1000) + + go func() { + tt3 := time.NewTicker(time.Millisecond * 10) + for { + select { + case <-tt3.C: + require.Less(t, pq.Len(), uint64(1002)) + case <-stopCh: + return + } + } + }() + + go func() { + tt := time.NewTicker(time.Second) + + for { + select { + case <-tt.C: + fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) + atomic.StoreUint64(&insertsPerSec, 0) + fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) + atomic.StoreUint64(&getPerSec, 0) + case <-stopCh: + tt.Stop() + return + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + pq.GetMax() + atomic.AddUint64(&getPerSec, 1) + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + pq.Insert(Test(rand.Int())) //nolint:gosec + atomic.AddUint64(&insertsPerSec, 1) + } + } + }() + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + stopCh <- struct{}{} + stopCh <- struct{}{} + stopCh <- struct{}{} +} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go new file mode 100644 index 00000000..8278dc8d --- /dev/null +++ b/pkg/priority_queue/interface.go @@ -0,0 +1,28 @@ +package priorityqueue + +type Queue interface { + Insert(item Item) + GetMax() Item + Len() uint64 +} + +// Item represents binary heap item +type Item interface { + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() uint64 + + // Body is the Item payload + Body() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing + Ack() error + + // Nack - discard the Item + Nack() error +} |