diff options
author | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
commit | ab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch) | |
tree | 0a58b043605ef1d9b09e75b207c236aacb1ed55a /plugins | |
parent | bd0da830ae345e1ed4a67782bf413673beeba037 (diff) |
Update to go 1.17
Add Stat with tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/informer/plugin.go | 34 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 169 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 15 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 38 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 140 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 43 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 2 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 31 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 5 |
12 files changed, 370 insertions, 163 deletions
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index 21fd7983..87180be5 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -1,20 +1,30 @@ package informer import ( + "context" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/logger" ) const PluginName = "informer" type Plugin struct { + log logger.Logger + + withJobs map[string]JobsStat withWorkers map[string]Informer available map[string]Availabler } -func (p *Plugin) Init() error { +func (p *Plugin) Init(log logger.Logger) error { p.available = make(map[string]Availabler) p.withWorkers = make(map[string]Informer) + p.withJobs = make(map[string]JobsStat) + + p.log = log return nil } @@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State { return svc.Workers() } +// Jobs provides information about jobs for the registered plugin using jobs +func (p *Plugin) Jobs(name string) []*job.State { + svc, ok := p.withJobs[name] + if !ok { + return nil + } + + st, err := svc.JobsState(context.Background()) + if err != nil { + p.log.Info("jobs stat", "error", err) + // skip errors here + return nil + } + + return st +} + // Collects declares services to be collected. func (p *Plugin) Collects() []interface{} { return []interface{}{ p.CollectPlugins, p.CollectWorkers, + p.CollectJobs, } } @@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) { p.withWorkers[name.Name()] = r } +func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) { + p.withJobs[name.Name()] = j +} + // Name of the service. func (p *Plugin) Name() string { return PluginName diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 3738b619..478d3227 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,6 +1,7 @@ package informer import ( + "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/pkg/state/process" ) @@ -10,7 +11,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { - // Workers is list of workers. + // Workers are list of workers. Workers []*process.State `json:"workers"` } @@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error { func (rpc *rpc) Workers(service string, list *WorkerList) error { workers := rpc.srv.Workers(service) if workers == nil { - list = nil return nil } @@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error { return nil } +func (rpc *rpc) Jobs(service string, out *[]*job.State) error { + *out = rpc.srv.Jobs(service) + return nil +} + // sort.Sort func (w *WorkerList) Len() int { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index b89cdc82..f3b40ae3 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -17,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -51,9 +52,8 @@ type JobConsumer struct { multipleAck bool requeueOnFail bool - delayCache map[string]struct{} - listeners uint32 + delayed *int64 stopCh chan struct{} } @@ -100,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh: make(chan struct{}), // TODO to config retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, @@ -170,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con consumeID: uuid.NewString(), stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), @@ -232,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -// handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - j.delayCache[tmpQ] = struct{}{} - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil @@ -375,9 +300,14 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ - Queue: q.Name, - Active: int64(q.Messages), + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: *j.delayed, }, nil case <-ctx.Done(): @@ -494,3 +424,80 @@ func (j *JobConsumer) Stop(context.Context) error { }) return nil } + +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 5990d137..623dcca7 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -3,6 +3,7 @@ package amqp import ( "context" "fmt" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -52,8 +53,8 @@ type Options struct { nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - + requeueFn func(context.Context, *Item) error + delayed *int64 multipleAsk bool requeue bool } @@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } // overwrite the delay i.Options.Delay = delay i.Headers = headers @@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack + item.Options.delayed = j.delayed // requeue func item.Options.requeueFn = j.handleItem diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 32ca4188..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, errN + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry put only when we redialed return cp.t.Put(body, pri, delay, ttr) @@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, nil, errN + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Reserve only when we redialed return cp.ts.Reserve(reserveTimeout) @@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error return id, body, nil } -func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { cp.RLock() defer cp.RUnlock() @@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return errN + return errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Delete only when we redialed return cp.conn.Delete(id) @@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 7e81e6d9..b4d76d38 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + // this is not an error, ready in terms of beanstalk means reserved in the tube + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { @@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index d801b7b4..34778642 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,7 +2,6 @@ package ephemeral import ( "context" - "sync" "sync/atomic" "time" @@ -14,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -29,14 +29,18 @@ type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler - pipeline sync.Map + pipeline atomic.Value pq priorityqueue.Queue localPrefetch chan *Item // time.sleep goroutines max number goroutines uint64 - stopCh chan struct{} + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } @@ -62,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // initialize a local queue jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - // consume from the queue - go jb.consume() - return jb, nil } @@ -74,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } // initialize a local queue jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - // consume from the queue - go jb.consume() - return jb, nil } @@ -90,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - b, ok := j.pipeline.Load(jb.Options.Pipeline) + _, ok := j.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - err := j.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) @@ -107,53 +105,69 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil +func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(j.active), + Delayed: atomic.LoadInt64(j.delayed), + }, nil } func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - const op = errors.Op("ephemeral_register") - if _, ok := j.pipeline.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipeline.Store(pipeline.Name(), true) - + j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipeline.Store(pipeline, false) - } - // if not true - do not send the EventPipeStopped, because pipe already stopped +func (j *JobConsumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") return } + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop the consumer + j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ Event: events.EventPipePaused, - Pipeline: pipeline, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) } -func (j *JobConsumer) Resume(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == false { - // mark pipeline as turned on - j.pipeline.Store(pipeline, true) - } +func (j *JobConsumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } - // if not true - do not send the EventPipeActive, because pipe already active + l := atomic.LoadUint32(&j.listeners) + // listener already active + if l == 1 { + j.log.Warn("listener already in the active state") return } + // resume the consumer on the same channel + j.consume() + + atomic.StoreUint32(&j.listeners, 1) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, - Pipeline: pipeline, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) @@ -172,22 +186,19 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { func (j *JobConsumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") - var pipe string - j.pipeline.Range(func(key, _ interface{}) bool { - pipe = key.(string) - j.pipeline.Delete(key) - return true - }) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) select { // return from the consumer case j.stopCh <- struct{}{}: j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, - Pipeline: pipe, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) + return nil case <-ctx.Done(): @@ -208,6 +219,8 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { go func(jj *Item) { atomic.AddUint64(&j.goroutines, 1) + atomic.AddInt64(j.delayed, 1) + time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired @@ -219,6 +232,9 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return nil } + // increase number of the active jobs + atomic.AddInt64(j.active, 1) + // insert to the local, limited pipeline select { case j.localPrefetch <- msg: @@ -229,21 +245,25 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } func (j *JobConsumer) consume() { - // redirect - for { - select { - case item, ok := <-j.localPrefetch: - if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") + go func() { + // redirect + for { + select { + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = j.handleItem + item.Options.active = j.active + item.Options.delayed = j.delayed + + j.pq.Insert(item) + case <-j.stopCh: return } - - // set requeue channel - item.Options.requeueFn = j.handleItem - - j.pq.Insert(item) - case <-j.stopCh: - return } - } + }() } diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 1a61d7e9..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -2,6 +2,7 @@ package ephemeral import ( "context" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -40,6 +41,8 @@ type Options struct { // private requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } func (i *Item) Nack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } @@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { i.Options.Delay = delay i.Headers = headers + i.atomicallyReduceCount() + err := i.Options.requeueFn(context.Background(), i) if err != nil { return err @@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 4fb684f8..d957b3eb 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -2,6 +2,7 @@ package sqs import ( "context" + "strconv" "sync" "sync/atomic" "time" @@ -11,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" @@ -263,7 +265,44 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { } func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + } + + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -272,7 +311,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") + const op = errors.Op("sqs_run") j.Lock() defer j.Unlock() diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index c89877e3..e195c407 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,7 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. For example: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 7f9859fb..2750cd8f 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,6 +1,8 @@ package jobs import ( + "context" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -14,7 +16,7 @@ type rpc struct { } func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push") // convert transport entity into domain // how we can do this quickly @@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { } func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push_batch") l := len(j.GetJobs()) @@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { // 2. Pipeline name // 3. Options related to the particular pipeline func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") pipe := &pipeline.Pipeline{} for i := range req.GetPipeline() { @@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error } func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { @@ -112,6 +114,27 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err return nil } +func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { + const op = errors.Op("rpc_stats") + state, err := r.p.JobsState(context.Background()) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(state); i++ { + resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ + Pipeline: state[i].Pipeline, + Driver: state[i].Driver, + Queue: state[i].Queue, + Active: state[i].Active, + Delayed: state[i].Delayed, + Reserved: state[i].Reserved, + }) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ad371bf8..a7db0f83 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -118,7 +118,8 @@ func (p *Plugin) Serve() chan error { Supervisor: p.cfg.Pool.Supervisor, }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { - errCh <- err + errCh <- errors.E(op, err) + return } p.accessValidator = p.defaultAccessValidator(p.phpPool) @@ -135,7 +136,7 @@ func (p *Plugin) Serve() chan error { default: data, err := ps.Next() if err != nil { - errCh <- err + errCh <- errors.E(op, err) return } p.workersPool.Queue(data) |