diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
commit | d099e47ab28dd044d34e18347a4c714b8af3d612 (patch) | |
tree | e106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/plugin.go | |
parent | ec7c049036d31fe030d106db9f0d268ea0296c5f (diff) |
SQS driver.
Fix isssues in the AMQP driver.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c83078c3..ce51df21 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,7 +3,9 @@ package jobs import ( "context" "fmt" + "runtime" "sync" + "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -14,8 +16,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -172,6 +174,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return errCh } + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + var rate uint64 + go func() { + tt := time.NewTicker(time.Second * 1) + for { //nolint:gosimple + select { + case <-tt.C: + fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) + fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) + fmt.Printf("---> curr len: %d\n", p.queue.Len()) + atomic.StoreUint64(&rate, 0) + } + } + }() + + // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- + // start listening go func() { for i := uint8(0); i < p.cfg.NumPollers; i++ { @@ -219,6 +238,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) } + // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- + atomic.AddUint64(&rate, 1) } } }() @@ -289,7 +310,7 @@ func (p *Plugin) Reset() error { return nil } -func (p *Plugin) Push(j *structs.Job) error { +func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") // get the pipeline for the job @@ -320,7 +341,7 @@ func (p *Plugin) Push(j *structs.Job) error { return nil } -func (p *Plugin) PushBatch(j []*structs.Job) error { +func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { |