summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
committerValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
commitd099e47ab28dd044d34e18347a4c714b8af3d612 (patch)
treee106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/plugin.go
parentec7c049036d31fe030d106db9f0d268ea0296c5f (diff)
SQS driver.
Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go27
1 files changed, 24 insertions, 3 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index c83078c3..ce51df21 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,9 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -14,8 +16,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -172,6 +174,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return errCh
}
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
+ fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
+ fmt.Printf("---> curr len: %d\n", p.queue.Len())
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// start listening
go func() {
for i := uint8(0); i < p.cfg.NumPollers; i++ {
@@ -219,6 +238,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
}
}
}()
@@ -289,7 +310,7 @@ func (p *Plugin) Reset() error {
return nil
}
-func (p *Plugin) Push(j *structs.Job) error {
+func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
// get the pipeline for the job
@@ -320,7 +341,7 @@ func (p *Plugin) Push(j *structs.Job) error {
return nil
}
-func (p *Plugin) PushBatch(j []*structs.Job) error {
+func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {