diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 09:05:15 +0300 |
commit | bb7b6a4d1821b36804f5d5756144ae42ad92b13e (patch) | |
tree | 351ed632680ef9525dbba17ebe20f4118f57ab5a /plugins | |
parent | b84a7cb26c184b709f18d3d52925b31d49351c03 (diff) |
Add length limiter for the binary heaps data structure. After max length
reached, insert operation will be blocked until all pending messages
have been fully consumed.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/plugin.go | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index df34856e..0f645b12 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,7 +3,10 @@ package jobs import ( "context" "fmt" + "runtime" "sync" + "sync/atomic" + "time" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" @@ -82,16 +85,51 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap() + p.queue = priorityqueue.NewBinHeap(100_000_000) p.log = log return nil } -func (p *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + var rate uint64 + go func() { + tt := time.NewTicker(time.Second * 1) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate)) + atomic.StoreUint64(&rate, 0) + } + } + }() + + go func() { + tt := time.NewTicker(time.Millisecond * 1000) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> goroutines: %d", runtime.NumGoroutine()) + } + } + }() + + go func() { + tt := time.NewTicker(time.Millisecond * 1000) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> curr len: %d", p.queue.Len()) + } + } + }() + + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { // pipeline name (ie test-local, sqs-aws, etc) @@ -167,6 +205,9 @@ func (p *Plugin) Serve() chan error { continue } + // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- + atomic.AddUint64(&rate, 1) + job.Ack() } }() |