diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 194 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 15 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 47 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 220 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 65 | ||||
-rw-r--r-- | plugins/jobs/metrics.go | 91 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 134 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 2 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 32 |
11 files changed, 642 insertions, 205 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 429953e1..95df02ec 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -12,10 +12,12 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -50,9 +52,8 @@ type JobConsumer struct { multipleAck bool requeueOnFail bool - delayCache map[string]struct{} - listeners uint32 + delayed *int64 stopCh chan struct{} } @@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh: make(chan struct{}), // TODO to config retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, @@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con consumeID: uuid.NewString(), stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), @@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -// handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - j.delayCache[tmpQ] = struct{}{} - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil @@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("amqp_driver_state") + select { + case pch := <-j.publishChan: + defer func() { + j.publishChan <- pch + }() + + q, err := pch.QueueInspect(j.queue) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil + + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } +} + func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error { }) return nil } + +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 5990d137..623dcca7 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -3,6 +3,7 @@ package amqp import ( "context" "fmt" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -52,8 +53,8 @@ type Options struct { nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - + requeueFn func(context.Context, *Item) error + delayed *int64 multipleAsk bool requeue bool } @@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } // overwrite the delay i.Options.Delay = delay i.Headers = headers @@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack + item.Options.delayed = j.delayed // requeue func item.Options.requeueFn = j.handleItem diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 32ca4188..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, errN + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry put only when we redialed return cp.t.Put(body, pri, delay, ttr) @@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, nil, errN + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Reserve only when we redialed return cp.ts.Reserve(reserveTimeout) @@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error return id, body, nil } -func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { cp.RLock() defer cp.RUnlock() @@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return errN + return errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Delete only when we redialed return cp.conn.Delete(id) @@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index eaf99be1..6323148b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -213,12 +215,49 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil +} + func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -260,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -315,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 95ad6ecd..f0992cd6 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,17 +2,18 @@ package ephemeral import ( "context" - "sync" "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -28,14 +29,18 @@ type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler - pipeline sync.Map + pipeline atomic.Value pq priorityqueue.Queue localPrefetch chan *Item // time.sleep goroutines max number goroutines uint64 - stopCh chan struct{} + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -46,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } @@ -61,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // initialize a local queue jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - // consume from the queue - go jb.consume() - return jb, nil } @@ -73,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } // initialize a local queue jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - // consume from the queue - go jb.consume() - return jb, nil } @@ -89,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - b, ok := j.pipeline.Load(jb.Options.Pipeline) + _, ok := j.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - err := j.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) @@ -106,102 +105,70 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - time.Sleep(jj.Options.DelayDuration()) - - // send the item after timeout expired - j.localPrefetch <- jj - - atomic.AddUint64(&j.goroutines, ^uint64(0)) - }(msg) - - return nil - } - - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) - } -} - -func (j *JobConsumer) consume() { - // redirect - for { - select { - case item, ok := <-j.localPrefetch: - if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") - return - } - - // set requeue channel - item.Options.requeueFn = j.handleItem - - j.pq.Insert(item) - case <-j.stopCh: - return - } - } +func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(j.active), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil } func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - const op = errors.Op("ephemeral_register") - if _, ok := j.pipeline.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipeline.Store(pipeline.Name(), true) - + j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipeline.Store(pipeline, false) - } - // if not true - do not send the EventPipeStopped, because pipe already stopped +func (j *JobConsumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") return } + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop the consumer + j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ Event: events.EventPipePaused, - Pipeline: pipeline, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) } -func (j *JobConsumer) Resume(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == false { - // mark pipeline as turned on - j.pipeline.Store(pipeline, true) - } +func (j *JobConsumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } - // if not true - do not send the EventPipeActive, because pipe already active + l := atomic.LoadUint32(&j.listeners) + // listener already active + if l == 1 { + j.log.Warn("listener already in the active state") return } + // resume the consumer on the same channel + j.consume() + + atomic.StoreUint32(&j.listeners, 1) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, - Pipeline: pipeline, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) @@ -220,25 +187,88 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { func (j *JobConsumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") - var pipe string - j.pipeline.Range(func(key, _ interface{}) bool { - pipe = key.(string) - j.pipeline.Delete(key) - return true - }) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) select { // return from the consumer case j.stopCh <- struct{}{}: j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, - Pipeline: pipe, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) + return nil case <-ctx.Done(): return errors.E(op, ctx.Err()) } } + +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&j.goroutines, 1) + atomic.AddInt64(j.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(j.active, 1) + + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } +} + +func (j *JobConsumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = j.handleItem + item.Options.active = j.active + item.Options.delayed = j.delayed + + j.pq.Insert(item) + case <-j.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 1a61d7e9..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -2,6 +2,7 @@ package ephemeral import ( "context" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -40,6 +41,8 @@ type Options struct { // private requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } func (i *Item) Nack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } @@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { i.Options.Delay = delay i.Headers = headers + i.atomicallyReduceCount() + err := i.Options.requeueFn(context.Background(), i) if err != nil { return err @@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 9ce37543..5d419b51 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -2,6 +2,7 @@ package sqs import ( "context" + "strconv" "sync" "sync/atomic" "time" @@ -11,10 +12,12 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + if err != nil { - return err + return nil, errors.E(op, err) } - _, err = j.client.SendMessage(ctx, d) - if err != nil { - return err + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), } - return nil + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") + const op = errors.Op("sqs_run") j.Lock() defer j.Unlock() @@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(j.queueURL) + if err != nil { + return err + } + _, err = j.client.SendMessage(ctx, d) + if err != nil { + return err + } + + return nil +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go new file mode 100644 index 00000000..61856a10 --- /dev/null +++ b/plugins/jobs/metrics.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/informer" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p.statsExporter} +} + +const ( + namespace = "rr_jobs" +) + +type statsExporter struct { + workers informer.Informer + workersMemory uint64 + jobsOk uint64 + pushOk uint64 + jobsErr uint64 + pushErr uint64 +} + +var ( + worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) + pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) + pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) + jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) + jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) +) + +func newStatsExporter(stats informer.Informer) *statsExporter { + return &statsExporter{ + workers: stats, + workersMemory: 0, + jobsOk: 0, + pushOk: 0, + jobsErr: 0, + pushErr: 0, + } +} + +func (se *statsExporter) metricsCallback(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { //nolint:exhaustive + case events.EventJobOK: + atomic.AddUint64(&se.jobsOk, 1) + case events.EventPushOK: + atomic.AddUint64(&se.pushOk, 1) + case events.EventPushError: + atomic.AddUint64(&se.pushErr, 1) + case events.EventJobError: + atomic.AddUint64(&se.jobsErr, 1) + } + } +} + +func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { + // send description + d <- pushErr + d <- pushOk + d <- jobsErr + d <- jobsOk +} + +func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := se.workers.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) + ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) + ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) + ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 7707cb8a..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -13,6 +13,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -57,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -103,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -200,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -218,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -235,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -254,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -303,6 +356,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { p.jobConstructors[name.Name()] = c } +func (p *Plugin) Workers() []*process.State { + p.RLock() + wrk := p.workersPool.Workers() + p.RUnlock() + + ps := make([]*process.State, len(wrk)) + + for i := 0; i < len(wrk); i++ { + st, err := process.WorkerProcessState(wrk[i]) + if err != nil { + p.log.Error("jobs workers state", "error", err) + return nil + } + + ps[i] = st + } + + return ps +} + +func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { + const op = errors.Op("jobs_plugin_drivers_state") + jst := make([]*jobState.State, 0, len(p.consumers)) + for k := range p.consumers { + d := p.consumers[k] + newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) + state, err := d.State(newCtx) + if err != nil { + cancel() + return nil, errors.E(op, err) + } + + jst = append(jst, state) + cancel() + } + return jst, nil +} + func (p *Plugin) Available() {} func (p *Plugin) Name() string { @@ -319,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -332,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -357,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -370,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -392,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -536,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index c89877e3..e195c407 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,7 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. For example: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 7f9859fb..94f903d5 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,6 +1,8 @@ package jobs import ( + "context" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -14,7 +16,7 @@ type rpc struct { } func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push") // convert transport entity into domain // how we can do this quickly @@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { } func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push_batch") l := len(j.GetJobs()) @@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { // 2. Pipeline name // 3. Options related to the particular pipeline func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") pipe := &pipeline.Pipeline{} for i := range req.GetPipeline() { @@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error } func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { @@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err return nil } +func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { + const op = errors.Op("rpc_stats") + state, err := r.p.JobsState(context.Background()) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(state); i++ { + resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ + Pipeline: state[i].Pipeline, + Driver: state[i].Driver, + Queue: state[i].Queue, + Active: state[i].Active, + Delayed: state[i].Delayed, + Reserved: state[i].Reserved, + Ready: state[i].Ready, + }) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} |