diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
commit | bb7b6a4d1821b36804f5d5756144ae42ad92b13e (patch) | |
tree | 351ed632680ef9525dbba17ebe20f4118f57ab5a /pkg | |
parent | b84a7cb26c184b709f18d3d52925b31d49351c03 (diff) |
Add length limiter for the binary heaps data structure. After max length
reached, insert operation will be blocked until all pending messages
have been fully consumed.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 39 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 4 |
2 files changed, 33 insertions, 10 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index a8d80fc0..82424331 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -12,15 +12,21 @@ import ( type BinHeap struct { items []Item // find a way to use pointer to the raw data - len uint64 - cond sync.Cond + len uint64 + maxLen uint64 + cond sync.Cond } -func NewBinHeap() *BinHeap { +func NewBinHeap(maxLen uint64) *BinHeap { + if maxLen == 0 { + maxLen = 100_000 + } + return &BinHeap{ - items: make([]Item, 0, 100), - len: 0, - cond: sync.Cond{L: &sync.Mutex{}}, + items: make([]Item, 0, maxLen), + len: 0, + maxLen: maxLen, + cond: sync.Cond{L: &sync.Mutex{}}, } } @@ -74,6 +80,21 @@ func (bh *BinHeap) Len() uint64 { func (bh *BinHeap) Insert(item Item) { bh.cond.L.Lock() + + // check the binary heap len before insertion + if bh.Len() > bh.maxLen { + // unlock the mutex to proceed to get-max + bh.cond.L.Unlock() + + // signal waiting goroutines + for bh.Len() > 0 { + // signal waiting goroutines + bh.cond.Signal() + } + // lock mutex to proceed inserting into the empty slice + bh.cond.L.Lock() + } + bh.items = append(bh.items, item) // add len to the slice @@ -89,9 +110,9 @@ func (bh *BinHeap) Insert(item Item) { func (bh *BinHeap) GetMax() Item { bh.cond.L.Lock() - defer bh.cond.L.Unlock() - for atomic.LoadUint64(&bh.len) == 0 { + // if len == 0, wait for the signal + for bh.Len() == 0 { bh.cond.Wait() } @@ -103,5 +124,7 @@ func (bh *BinHeap) GetMax() Item { // reduce len atomic.AddUint64(&bh.len, ^uint64(0)) + + bh.cond.L.Unlock() return item } diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index b02017b6..dadc3a39 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -37,7 +37,7 @@ func (t Test) Priority() uint64 { func TestBinHeap_Init(t *testing.T) { a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - bh := NewBinHeap() + bh := NewBinHeap(100) for i := 0; i < len(a); i++ { bh.Insert(a[i]) @@ -59,7 +59,7 @@ func TestNewPriorityQueue(t *testing.T) { insertsPerSec := uint64(0) getPerSec := uint64(0) stopCh := make(chan struct{}, 1) - pq := NewBinHeap() + pq := NewBinHeap(100) go func() { tt := time.NewTicker(time.Second) |