summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/priorityqueue/binary_heap.go39
-rw-r--r--pkg/priorityqueue/binary_heap_test.go4
-rw-r--r--plugins/jobs/plugin.go45
3 files changed, 76 insertions, 12 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index a8d80fc0..82424331 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -12,15 +12,21 @@ import (
type BinHeap struct {
items []Item
// find a way to use pointer to the raw data
- len uint64
- cond sync.Cond
+ len uint64
+ maxLen uint64
+ cond sync.Cond
}
-func NewBinHeap() *BinHeap {
+func NewBinHeap(maxLen uint64) *BinHeap {
+ if maxLen == 0 {
+ maxLen = 100_000
+ }
+
return &BinHeap{
- items: make([]Item, 0, 100),
- len: 0,
- cond: sync.Cond{L: &sync.Mutex{}},
+ items: make([]Item, 0, maxLen),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
}
}
@@ -74,6 +80,21 @@ func (bh *BinHeap) Len() uint64 {
func (bh *BinHeap) Insert(item Item) {
bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
bh.items = append(bh.items, item)
// add len to the slice
@@ -89,9 +110,9 @@ func (bh *BinHeap) Insert(item Item) {
func (bh *BinHeap) GetMax() Item {
bh.cond.L.Lock()
- defer bh.cond.L.Unlock()
- for atomic.LoadUint64(&bh.len) == 0 {
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
bh.cond.Wait()
}
@@ -103,5 +124,7 @@ func (bh *BinHeap) GetMax() Item {
// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
return item
}
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index b02017b6..dadc3a39 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -37,7 +37,7 @@ func (t Test) Priority() uint64 {
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
- bh := NewBinHeap()
+ bh := NewBinHeap(100)
for i := 0; i < len(a); i++ {
bh.Insert(a[i])
@@ -59,7 +59,7 @@ func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewBinHeap()
+ pq := NewBinHeap(100)
go func() {
tt := time.NewTicker(time.Second)
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index df34856e..0f645b12 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,10 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
+ "time"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
@@ -82,16 +85,51 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue.NewBinHeap(100_000_000)
p.log = log
return nil
}
-func (p *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate))
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> goroutines: %d", runtime.NumGoroutine())
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> curr len: %d", p.queue.Len())
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
// pipeline name (ie test-local, sqs-aws, etc)
@@ -167,6 +205,9 @@ func (p *Plugin) Serve() chan error {
continue
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
+
job.Ack()
}
}()