summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 09:05:15 +0300
committerValery Piashchynski <[email protected]>2021-07-08 09:05:15 +0300
commitbb7b6a4d1821b36804f5d5756144ae42ad92b13e (patch)
tree351ed632680ef9525dbba17ebe20f4118f57ab5a /plugins
parentb84a7cb26c184b709f18d3d52925b31d49351c03 (diff)
Add length limiter for the binary heaps data structure. After max length
reached, insert operation will be blocked until all pending messages have been fully consumed. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/plugin.go45
1 files changed, 43 insertions, 2 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index df34856e..0f645b12 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,10 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
+ "time"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
@@ -82,16 +85,51 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue.NewBinHeap(100_000_000)
p.log = log
return nil
}
-func (p *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate))
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> goroutines: %d", runtime.NumGoroutine())
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond * 1000)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> curr len: %d", p.queue.Len())
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
// pipeline name (ie test-local, sqs-aws, etc)
@@ -167,6 +205,9 @@ func (p *Plugin) Serve() chan error {
continue
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
+
job.Ack()
}
}()