diff options
Diffstat (limited to 'plugins')
34 files changed, 890 insertions, 319 deletions
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go index a957878c..05f1eb63 100644 --- a/plugins/gzip/plugin.go +++ b/plugins/gzip/plugin.go @@ -10,7 +10,6 @@ const PluginName = "gzip" type Plugin struct{} -// Init needed for the Endure func (g *Plugin) Init() error { return nil } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 2ee83384..dc887f87 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -10,7 +10,7 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/pkg/worker" handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index bbc1a048..9277b85b 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -1,7 +1,10 @@ package informer import ( - "github.com/spiral/roadrunner/v2/pkg/process" + "context" + + "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" ) /* @@ -9,17 +12,23 @@ Informer plugin should not receive any other plugin in the Init or via Collects Because Availabler implementation should present in every plugin */ +// Statistic interfaces ============== + // Informer used to get workers from particular plugin or set of plugins type Informer interface { Workers() []*process.State } +// JobsStat interface provide statistic for the jobs plugin +type JobsStat interface { + // JobsState returns slice with the attached drivers information + JobsState(ctx context.Context) ([]*job.State, error) +} + +// Statistic interfaces end ============ + // Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime type Availabler interface { // Available method needed to collect all plugins which are available in the runtime. Available() } - -type JobsStat interface { - Stat() -} diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index c613af58..87180be5 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -1,20 +1,30 @@ package informer import ( + "context" + endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/logger" ) const PluginName = "informer" type Plugin struct { + log logger.Logger + + withJobs map[string]JobsStat withWorkers map[string]Informer available map[string]Availabler } -func (p *Plugin) Init() error { +func (p *Plugin) Init(log logger.Logger) error { p.available = make(map[string]Availabler) p.withWorkers = make(map[string]Informer) + p.withJobs = make(map[string]JobsStat) + + p.log = log return nil } @@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State { return svc.Workers() } +// Jobs provides information about jobs for the registered plugin using jobs +func (p *Plugin) Jobs(name string) []*job.State { + svc, ok := p.withJobs[name] + if !ok { + return nil + } + + st, err := svc.JobsState(context.Background()) + if err != nil { + p.log.Info("jobs stat", "error", err) + // skip errors here + return nil + } + + return st +} + // Collects declares services to be collected. func (p *Plugin) Collects() []interface{} { return []interface{}{ p.CollectPlugins, p.CollectWorkers, + p.CollectJobs, } } @@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) { p.withWorkers[name.Name()] = r } +func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) { + p.withJobs[name.Name()] = j +} + // Name of the service. func (p *Plugin) Name() string { return PluginName diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 02254865..478d3227 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,7 +1,8 @@ package informer import ( - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" ) type rpc struct { @@ -10,7 +11,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { - // Workers is list of workers. + // Workers are list of workers. Workers []*process.State `json:"workers"` } @@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error { func (rpc *rpc) Workers(service string, list *WorkerList) error { workers := rpc.srv.Workers(service) if workers == nil { - list = nil return nil } @@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error { return nil } +func (rpc *rpc) Jobs(service string, out *[]*job.State) error { + *out = rpc.srv.Jobs(service) + return nil +} + // sort.Sort func (w *WorkerList) Len() int { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 429953e1..95df02ec 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -12,10 +12,12 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -50,9 +52,8 @@ type JobConsumer struct { multipleAck bool requeueOnFail bool - delayCache map[string]struct{} - listeners uint32 + delayed *int64 stopCh chan struct{} } @@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh: make(chan struct{}), // TODO to config retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, @@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con consumeID: uuid.NewString(), stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), @@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -// handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - j.delayCache[tmpQ] = struct{}{} - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil @@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("amqp_driver_state") + select { + case pch := <-j.publishChan: + defer func() { + j.publishChan <- pch + }() + + q, err := pch.QueueInspect(j.queue) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil + + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } +} + func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error { }) return nil } + +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 5990d137..623dcca7 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -3,6 +3,7 @@ package amqp import ( "context" "fmt" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -52,8 +53,8 @@ type Options struct { nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - + requeueFn func(context.Context, *Item) error + delayed *int64 multipleAsk bool requeue bool } @@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } // overwrite the delay i.Options.Delay = delay i.Headers = headers @@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack + item.Options.delayed = j.delayed // requeue func item.Options.requeueFn = j.handleItem diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 32ca4188..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, errN + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry put only when we redialed return cp.t.Put(body, pri, delay, ttr) @@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, nil, errN + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Reserve only when we redialed return cp.ts.Reserve(reserveTimeout) @@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error return id, body, nil } -func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { cp.RLock() defer cp.RUnlock() @@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return errN + return errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Delete only when we redialed return cp.conn.Delete(id) @@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index eaf99be1..6323148b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -213,12 +215,49 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil +} + func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -260,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -315,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 95ad6ecd..f0992cd6 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,17 +2,18 @@ package ephemeral import ( "context" - "sync" "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -28,14 +29,18 @@ type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler - pipeline sync.Map + pipeline atomic.Value pq priorityqueue.Queue localPrefetch chan *Item // time.sleep goroutines max number goroutines uint64 - stopCh chan struct{} + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -46,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } @@ -61,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // initialize a local queue jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - // consume from the queue - go jb.consume() - return jb, nil } @@ -73,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } // initialize a local queue jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - // consume from the queue - go jb.consume() - return jb, nil } @@ -89,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - b, ok := j.pipeline.Load(jb.Options.Pipeline) + _, ok := j.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - err := j.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) @@ -106,102 +105,70 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - time.Sleep(jj.Options.DelayDuration()) - - // send the item after timeout expired - j.localPrefetch <- jj - - atomic.AddUint64(&j.goroutines, ^uint64(0)) - }(msg) - - return nil - } - - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) - } -} - -func (j *JobConsumer) consume() { - // redirect - for { - select { - case item, ok := <-j.localPrefetch: - if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") - return - } - - // set requeue channel - item.Options.requeueFn = j.handleItem - - j.pq.Insert(item) - case <-j.stopCh: - return - } - } +func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(j.active), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil } func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - const op = errors.Op("ephemeral_register") - if _, ok := j.pipeline.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipeline.Store(pipeline.Name(), true) - + j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipeline.Store(pipeline, false) - } - // if not true - do not send the EventPipeStopped, because pipe already stopped +func (j *JobConsumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") return } + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop the consumer + j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ Event: events.EventPipePaused, - Pipeline: pipeline, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) } -func (j *JobConsumer) Resume(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == false { - // mark pipeline as turned on - j.pipeline.Store(pipeline, true) - } +func (j *JobConsumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } - // if not true - do not send the EventPipeActive, because pipe already active + l := atomic.LoadUint32(&j.listeners) + // listener already active + if l == 1 { + j.log.Warn("listener already in the active state") return } + // resume the consumer on the same channel + j.consume() + + atomic.StoreUint32(&j.listeners, 1) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, - Pipeline: pipeline, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) @@ -220,25 +187,88 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { func (j *JobConsumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") - var pipe string - j.pipeline.Range(func(key, _ interface{}) bool { - pipe = key.(string) - j.pipeline.Delete(key) - return true - }) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) select { // return from the consumer case j.stopCh <- struct{}{}: j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, - Pipeline: pipe, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) + return nil case <-ctx.Done(): return errors.E(op, ctx.Err()) } } + +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&j.goroutines, 1) + atomic.AddInt64(j.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(j.active, 1) + + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } +} + +func (j *JobConsumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = j.handleItem + item.Options.active = j.active + item.Options.delayed = j.delayed + + j.pq.Insert(item) + case <-j.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 1a61d7e9..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -2,6 +2,7 @@ package ephemeral import ( "context" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -40,6 +41,8 @@ type Options struct { // private requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } func (i *Item) Nack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } @@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { i.Options.Delay = delay i.Headers = headers + i.atomicallyReduceCount() + err := i.Options.requeueFn(context.Background(), i) if err != nil { return err @@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 9ce37543..5d419b51 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -2,6 +2,7 @@ package sqs import ( "context" + "strconv" "sync" "sync/atomic" "time" @@ -11,10 +12,12 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + if err != nil { - return err + return nil, errors.E(op, err) } - _, err = j.client.SendMessage(ctx, d) - if err != nil { - return err + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), } - return nil + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") + const op = errors.Op("sqs_run") j.Lock() defer j.Unlock() @@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(j.queueURL) + if err != nil { + return err + } + _, err = j.client.SendMessage(ctx, d) + if err != nil { + return err + } + + return nil +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go new file mode 100644 index 00000000..61856a10 --- /dev/null +++ b/plugins/jobs/metrics.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/informer" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p.statsExporter} +} + +const ( + namespace = "rr_jobs" +) + +type statsExporter struct { + workers informer.Informer + workersMemory uint64 + jobsOk uint64 + pushOk uint64 + jobsErr uint64 + pushErr uint64 +} + +var ( + worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) + pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) + pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) + jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) + jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) +) + +func newStatsExporter(stats informer.Informer) *statsExporter { + return &statsExporter{ + workers: stats, + workersMemory: 0, + jobsOk: 0, + pushOk: 0, + jobsErr: 0, + pushErr: 0, + } +} + +func (se *statsExporter) metricsCallback(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { //nolint:exhaustive + case events.EventJobOK: + atomic.AddUint64(&se.jobsOk, 1) + case events.EventPushOK: + atomic.AddUint64(&se.pushOk, 1) + case events.EventPushError: + atomic.AddUint64(&se.pushErr, 1) + case events.EventJobError: + atomic.AddUint64(&se.jobsErr, 1) + } + } +} + +func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { + // send description + d <- pushErr + d <- pushOk + d <- jobsErr + d <- jobsOk +} + +func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := se.workers.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) + ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) + ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) + ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 7707cb8a..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -13,6 +13,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -57,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -103,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -200,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -218,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -235,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -254,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -303,6 +356,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { p.jobConstructors[name.Name()] = c } +func (p *Plugin) Workers() []*process.State { + p.RLock() + wrk := p.workersPool.Workers() + p.RUnlock() + + ps := make([]*process.State, len(wrk)) + + for i := 0; i < len(wrk); i++ { + st, err := process.WorkerProcessState(wrk[i]) + if err != nil { + p.log.Error("jobs workers state", "error", err) + return nil + } + + ps[i] = st + } + + return ps +} + +func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { + const op = errors.Op("jobs_plugin_drivers_state") + jst := make([]*jobState.State, 0, len(p.consumers)) + for k := range p.consumers { + d := p.consumers[k] + newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) + state, err := d.State(newCtx) + if err != nil { + cancel() + return nil, errors.E(op, err) + } + + jst = append(jst, state) + cancel() + } + return jst, nil +} + func (p *Plugin) Available() {} func (p *Plugin) Name() string { @@ -319,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -332,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -357,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -370,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -392,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -536,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index c89877e3..e195c407 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,7 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. For example: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 7f9859fb..94f903d5 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,6 +1,8 @@ package jobs import ( + "context" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -14,7 +16,7 @@ type rpc struct { } func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push") // convert transport entity into domain // how we can do this quickly @@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { } func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push_batch") l := len(j.GetJobs()) @@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { // 2. Pipeline name // 3. Options related to the particular pipeline func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") pipe := &pipeline.Pipeline{} for i := range req.GetPipeline() { @@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error } func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { @@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err return nil } +func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { + const op = errors.Op("rpc_stats") + state, err := r.p.JobsState(context.Background()) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(state); i++ { + resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ + Pipeline: state[i].Pipeline, + Driver: state[i].Driver, + Queue: state[i].Queue, + Active: state[i].Active, + Delayed: state[i].Delayed, + Reserved: state[i].Reserved, + Ready: state[i].Ready, + }) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index c79f3eb0..fd30eb54 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -1,8 +1,10 @@ package memory import ( + "context" "sync" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } } -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - msg := <-p.pushCh - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) } return nil, nil diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go new file mode 100644 index 00000000..89d707af --- /dev/null +++ b/plugins/redis/jobs/config.go @@ -0,0 +1,34 @@ +package jobs + +import "time" + +type Config struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + if s.Addrs == nil { + s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage + } +} diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go new file mode 100644 index 00000000..415ac457 --- /dev/null +++ b/plugins/redis/jobs/consumer.go @@ -0,0 +1 @@ +package jobs diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go new file mode 100644 index 00000000..415ac457 --- /dev/null +++ b/plugins/redis/jobs/item.go @@ -0,0 +1 @@ +package jobs diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go new file mode 100644 index 00000000..5b760952 --- /dev/null +++ b/plugins/redis/kv/config.go @@ -0,0 +1,34 @@ +package kv + +import "time" + +type Config struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + if s.Addrs == nil { + s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage + } +} diff --git a/plugins/redis/kv.go b/plugins/redis/kv/kv.go index 29f89d46..b41cb86c 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv/kv.go @@ -1,4 +1,4 @@ -package redis +package kv import ( "context" diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 3c62a63f..961182a9 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -9,6 +9,8 @@ import ( "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv" + redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub" ) const PluginName = "redis" @@ -62,7 +64,7 @@ func (p *Plugin) Available() {} // KVConstruct provides KV storage implementation over the redis plugin func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("redis_plugin_provide") - st, err := NewRedisDriver(p.log, key, p.cfgPlugin) + st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin) if err != nil { return nil, errors.E(op, err) } @@ -71,5 +73,5 @@ func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { } func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) + return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) } diff --git a/plugins/redis/channel.go b/plugins/redis/pubsub/channel.go index 0cd62d19..a1655ab2 100644 --- a/plugins/redis/channel.go +++ b/plugins/redis/pubsub/channel.go @@ -1,4 +1,4 @@ -package redis +package pubsub import ( "context" @@ -92,6 +92,6 @@ func (r *redisChannel) stop() error { return nil } -func (r *redisChannel) message() *pubsub.Message { - return <-r.out +func (r *redisChannel) message() chan *pubsub.Message { + return r.out } diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go new file mode 100644 index 00000000..bf8d2fc9 --- /dev/null +++ b/plugins/redis/pubsub/config.go @@ -0,0 +1,34 @@ +package pubsub + +import "time" + +type Config struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + if s.Addrs == nil { + s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage + } +} diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub/pubsub.go index 01efc623..c9ad3d58 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub/pubsub.go @@ -1,4 +1,4 @@ -package redis +package pubsub import ( "context" @@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return p.channel.message(), nil +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("redis_driver_next") + select { + case msg := <-p.channel.message(): + return msg, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } } diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go index d3271d6c..a9a5a63c 100644 --- a/plugins/reload/plugin.go +++ b/plugins/reload/plugin.go @@ -20,12 +20,12 @@ type Plugin struct { log logger.Logger watcher *Watcher services map[string]interface{} - res resetter.Resetter + res *resetter.Plugin stopc chan struct{} } // Init controller service -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error { +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res *resetter.Plugin) error { const op = errors.Op("reload_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -86,9 +86,9 @@ func (s *Plugin) Serve() chan error { } // make a map with unique services - // so, if we would have a 100 events from http service - // in map we would see only 1 key and it's config - treshholdc := make(chan struct { + // so, if we would have 100 events from http service + // in map we would see only 1 key, and it's config + thCh := make(chan struct { serviceConfig ServiceConfig service string }, thresholdChanBuffer) @@ -98,7 +98,7 @@ func (s *Plugin) Serve() chan error { go func() { for e := range s.watcher.Event { - treshholdc <- struct { + thCh <- struct { serviceConfig ServiceConfig service string }{serviceConfig: s.cfg.Services[e.service], service: e.service} @@ -111,7 +111,7 @@ func (s *Plugin) Serve() chan error { go func() { for { select { - case cfg := <-treshholdc: + case cfg := <-thCh: // logic is following: // restart timer.Stop() @@ -124,7 +124,7 @@ func (s *Plugin) Serve() chan error { case <-timer.C: if len(updated) > 0 { for name := range updated { - err := s.res.ResetByName(name) + err := s.res.Reset(name) if err != nil { timer.Stop() errCh <- errors.E(op, err) diff --git a/plugins/resetter/interface.go b/plugins/resetter/interface.go index 47d8d791..0defcaba 100644 --- a/plugins/resetter/interface.go +++ b/plugins/resetter/interface.go @@ -1,17 +1,7 @@ package resetter -// If plugin implements Resettable interface, than it state can be resetted without reload in runtime via RPC/HTTP -type Resettable interface { - // Reset reload all plugins - Reset() error -} - -// Resetter interface is the Resetter plugin main interface +// Resetter interface type Resetter interface { - // Reset all registered plugins - ResetAll() error - // Reset by plugin name - ResetByName(string) error - // GetAll registered plugins - GetAll() []string + // Reset reload plugin + Reset() error } diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go index 4feb692a..b2fe59af 100644 --- a/plugins/resetter/plugin.go +++ b/plugins/resetter/plugin.go @@ -3,61 +3,32 @@ package resetter import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" ) const PluginName = "resetter" type Plugin struct { - registry map[string]Resettable - log logger.Logger + registry map[string]Resetter } -func (p *Plugin) ResetAll() error { - const op = errors.Op("resetter_plugin_reset_all") - for name := range p.registry { - err := p.registry[name].Reset() - if err != nil { - return errors.E(op, err) - } - } - return nil -} - -func (p *Plugin) ResetByName(plugin string) error { - const op = errors.Op("resetter_plugin_reset_by_name") - if plugin, ok := p.registry[plugin]; ok { - return plugin.Reset() - } - return errors.E(op, errors.Errorf("can't find plugin: %s", plugin)) -} - -func (p *Plugin) GetAll() []string { - all := make([]string, 0, len(p.registry)) - for name := range p.registry { - all = append(all, name) - } - return all -} - -func (p *Plugin) Init(log logger.Logger) error { - p.registry = make(map[string]Resettable) - p.log = log +func (p *Plugin) Init() error { + p.registry = make(map[string]Resetter) return nil } // Reset named service. func (p *Plugin) Reset(name string) error { + const op = errors.Op("resetter_plugin_reset_by_name") svc, ok := p.registry[name] if !ok { - return errors.E("no such service", errors.Str(name)) + return errors.E(op, errors.Errorf("no such service: %s", name)) } return svc.Reset() } // RegisterTarget resettable service. -func (p *Plugin) RegisterTarget(name endure.Named, r Resettable) error { +func (p *Plugin) RegisterTarget(name endure.Named, r Resetter) error { p.registry[name.Name()] = r return nil } @@ -80,5 +51,5 @@ func (p *Plugin) Available() { // RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, log: p.log} + return &rpc{srv: p} } diff --git a/plugins/resetter/rpc.go b/plugins/resetter/rpc.go index 69c955b0..79171b5c 100644 --- a/plugins/resetter/rpc.go +++ b/plugins/resetter/rpc.go @@ -1,30 +1,29 @@ package resetter -import "github.com/spiral/roadrunner/v2/plugins/logger" +import "github.com/spiral/errors" type rpc struct { srv *Plugin - log logger.Logger } // List all resettable plugins. func (rpc *rpc) List(_ bool, list *[]string) error { - rpc.log.Debug("started List method") *list = make([]string, 0) for name := range rpc.srv.registry { *list = append(*list, name) } - rpc.log.Debug("services list", "services", *list) - - rpc.log.Debug("finished List method") return nil } // Reset named plugin. func (rpc *rpc) Reset(service string, done *bool) error { - rpc.log.Debug("started Reset method for the service", "service", service) - defer rpc.log.Debug("finished Reset method for the service", "service", service) + const op = errors.Op("resetter_rpc_reset") + err := rpc.srv.Reset(service) + if err != nil { + *done = false + return errors.E(op, err) + } *done = true - return rpc.srv.Reset(service) + return nil } diff --git a/plugins/server/command.go b/plugins/server/command.go index e0b61896..b8bc1395 100644 --- a/plugins/server/command.go +++ b/plugins/server/command.go @@ -29,5 +29,5 @@ func (server *Plugin) scanCommand(cmd []string) error { return nil } } - return errors.E(errors.Str("scan failed, possible path not found"), op) + return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 1694cdf1..16e3bd8c 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -97,7 +97,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { // try to find a path here err := server.scanCommand(cmdArgs) if err != nil { - server.log.Info("scan command", "error", err) + server.log.Info("scan command", "reason", err) } return func() *exec.Cmd { diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go index 28b84443..3bd0f956 100644 --- a/plugins/service/plugin.go +++ b/plugins/service/plugin.go @@ -4,7 +4,7 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 2df23f11..395b056f 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -13,7 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" @@ -58,6 +58,10 @@ type Plugin struct { // server which produces commands to the pool server server.Server + // stop receiving messages + cancel context.CancelFunc + ctx context.Context + // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn } @@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.log = log p.broadcaster = b + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel return nil } @@ -118,7 +126,8 @@ func (p *Plugin) Serve() chan error { Supervisor: p.cfg.Pool.Supervisor, }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { - errCh <- err + errCh <- errors.E(op, err) + return } p.accessValidator = p.defaultAccessValidator(p.phpPool) @@ -129,17 +138,17 @@ func (p *Plugin) Serve() chan error { // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { - select { - case <-p.serveExit: - return - default: - data, err := ps.Next() - if err != nil { - errCh <- err + data, err := ps.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - p.workersPool.Queue(data) + + errCh <- errors.E(op, err) + return } + + p.workersPool.Queue(data) } }(p.subReader) @@ -149,6 +158,8 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() + // cancel context + p.cancel() p.Lock() if p.phpPool == nil { p.Unlock() |