summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 09:05:15 +0300
committerValery Piashchynski <[email protected]>2021-07-08 09:05:15 +0300
commitbb7b6a4d1821b36804f5d5756144ae42ad92b13e (patch)
tree351ed632680ef9525dbba17ebe20f4118f57ab5a
parentb84a7cb26c184b709f18d3d52925b31d49351c03 (diff)
Add length limiter for the binary heaps data structure. After max length
reached, insert operation will be blocked until all pending messages have been fully consumed. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/priorityqueue/binary_heap.go39
-rw-r--r--pkg/priorityqueue/binary_heap_test.go4
-rw-r--r--plugins/jobs/plugin.go45
3 files changed, 76 insertions, 12 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index a8d80fc0..82424331 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -12,15 +12,21 @@ import (
type BinHeap struct {
items []Item
// find a way to use pointer to the raw data
- len uint64
- cond sync.Cond
+ len uint64
+ maxLen uint64
+ cond sync.Cond
}
-func NewBinHeap() *BinHeap {
+func NewBinHeap(maxLen uint64) *BinHeap {
+ if maxLen == 0 {
+ maxLen = 100_000
+ }
+
return &BinHeap{
- items: make([]Item, 0, 100),
- len: 0,
- cond: sync.Cond{L: &sync.Mutex{}},
+ items: make([]Item, 0, maxLen),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
}
}
@@ -74,6 +80,21 @@ func (bh *BinHeap) Len() uint64 {
func (bh *BinHeap) Insert(item Item) {
bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
bh.items = append(bh.items, item)
// add len to the slice
@@ -89,9 +110,9 @@ func (bh *BinHeap) Insert(item Item) {
func (bh *BinHeap) GetMax() Item {
bh.cond.L.Lock()
- defer bh.cond.L.Unlock()
- for atomic.LoadUint64(&bh.len) == 0 {
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
bh.cond.Wait()
}
@@ -103,5 +124,7 @@ func (bh *BinHeap) GetMax() Item {
// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
return item
}
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index b02017b6..dadc3a39 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -37,7 +37,7 @@ func (t Test) Priority() uint64 {
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
- bh := NewBinHeap()
+ bh := NewBinHeap(100)
for i := 0; i < len(a); i++ {
bh.Insert(a[i])
@@ -59,7 +59,7 @@ func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewBinHeap()
+ pq := NewBinHeap(100)
go func() {
tt := time.NewTicker(time.Second)
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index df34856e..0f645b12 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,10 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
+ "time"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
@@ -82,16 +85,51 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue.NewBinHeap(100_000_000)
p.log = log
return nil
}
-func (p *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate))
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> goroutines: %d", runtime.NumGoroutine())
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> curr len: %d", p.queue.Len())
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
// pipeline name (ie test-local, sqs-aws, etc)
@@ -167,6 +205,9 @@ func (p *Plugin) Serve() chan error {
continue
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
+
job.Ack()
}
}()