diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
commit | bb7b6a4d1821b36804f5d5756144ae42ad92b13e (patch) | |
tree | 351ed632680ef9525dbba17ebe20f4118f57ab5a | |
parent | b84a7cb26c184b709f18d3d52925b31d49351c03 (diff) |
Add length limiter for the binary heaps data structure. After max length
reached, insert operation will be blocked until all pending messages
have been fully consumed.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 39 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 4 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 45 |
3 files changed, 76 insertions, 12 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index a8d80fc0..82424331 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -12,15 +12,21 @@ import ( type BinHeap struct { items []Item // find a way to use pointer to the raw data - len uint64 - cond sync.Cond + len uint64 + maxLen uint64 + cond sync.Cond } -func NewBinHeap() *BinHeap { +func NewBinHeap(maxLen uint64) *BinHeap { + if maxLen == 0 { + maxLen = 100_000 + } + return &BinHeap{ - items: make([]Item, 0, 100), - len: 0, - cond: sync.Cond{L: &sync.Mutex{}}, + items: make([]Item, 0, maxLen), + len: 0, + maxLen: maxLen, + cond: sync.Cond{L: &sync.Mutex{}}, } } @@ -74,6 +80,21 @@ func (bh *BinHeap) Len() uint64 { func (bh *BinHeap) Insert(item Item) { bh.cond.L.Lock() + + // check the binary heap len before insertion + if bh.Len() > bh.maxLen { + // unlock the mutex to proceed to get-max + bh.cond.L.Unlock() + + // signal waiting goroutines + for bh.Len() > 0 { + // signal waiting goroutines + bh.cond.Signal() + } + // lock mutex to proceed inserting into the empty slice + bh.cond.L.Lock() + } + bh.items = append(bh.items, item) // add len to the slice @@ -89,9 +110,9 @@ func (bh *BinHeap) Insert(item Item) { func (bh *BinHeap) GetMax() Item { bh.cond.L.Lock() - defer bh.cond.L.Unlock() - for atomic.LoadUint64(&bh.len) == 0 { + // if len == 0, wait for the signal + for bh.Len() == 0 { bh.cond.Wait() } @@ -103,5 +124,7 @@ func (bh *BinHeap) GetMax() Item { // reduce len atomic.AddUint64(&bh.len, ^uint64(0)) + + bh.cond.L.Unlock() return item } diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index b02017b6..dadc3a39 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -37,7 +37,7 @@ func (t Test) Priority() uint64 { func TestBinHeap_Init(t *testing.T) { a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - bh := NewBinHeap() + bh := NewBinHeap(100) for i := 0; i < len(a); i++ { bh.Insert(a[i]) @@ -59,7 +59,7 @@ func TestNewPriorityQueue(t *testing.T) { insertsPerSec := uint64(0) getPerSec := uint64(0) stopCh := make(chan struct{}, 1) - pq := NewBinHeap() + pq := NewBinHeap(100) go func() { tt := time.NewTicker(time.Second) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index df34856e..0f645b12 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,7 +3,10 @@ package jobs import ( "context" "fmt" + "runtime" "sync" + "sync/atomic" + "time" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" @@ -82,16 +85,51 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap() + p.queue = priorityqueue.NewBinHeap(100_000_000) p.log = log return nil } -func (p *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + var rate uint64 + go func() { + tt := time.NewTicker(time.Second * 1) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate)) + atomic.StoreUint64(&rate, 0) + } + } + }() + + go func() { + tt := time.NewTicker(time.Millisecond * 1000) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> goroutines: %d", runtime.NumGoroutine()) + } + } + }() + + go func() { + tt := time.NewTicker(time.Millisecond * 1000) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> curr len: %d", p.queue.Len()) + } + } + }() + + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { // pipeline name (ie test-local, sqs-aws, etc) @@ -167,6 +205,9 @@ func (p *Plugin) Serve() chan error { continue } + // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- + atomic.AddUint64(&rate, 1) + job.Ack() } }() |