diff options
25 files changed, 1283 insertions, 238 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index fcf2c611..0224de69 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -23,7 +23,7 @@ jobs: fail-fast: true matrix: php: [ "7.4", "8.0" ] - go: [ "1.16" ] + go: [ "1.17" ] os: [ 'ubuntu-latest' ] steps: - name: Set up Go ${{ matrix.go }} @@ -1,6 +1,6 @@ module github.com/spiral/roadrunner/v2 -go 1.16 +go 1.17 require ( github.com/Shopify/toxiproxy v2.1.4+incompatible @@ -46,3 +46,51 @@ require ( google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) + +require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/andybalholm/brotli v1.0.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.2.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/go-ole/go-ole v1.2.5 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/magiconair/properties v1.8.5 // indirect + github.com/mattn/go-colorable v0.1.8 // indirect + github.com/mattn/go-isatty v0.0.12 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/mitchellh/mapstructure v1.4.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pelletier/go-toml v1.9.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.26.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect + github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c // indirect + github.com/spf13/afero v1.6.0 // indirect + github.com/spf13/cast v1.3.1 // indirect + github.com/spf13/jwalterweatherman v1.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/subosito/gotenv v1.2.0 // indirect + github.com/tklauser/numcpus v0.2.3 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.26.0 // indirect + github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect + github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect + go.uber.org/atomic v1.7.0 // indirect + golang.org/x/text v0.3.6 // indirect + golang.org/x/tools v0.1.2 // indirect + google.golang.org/appengine v1.6.7 // indirect + gopkg.in/ini.v1 v1.62.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index b20e4242..3eb0714f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them to complete the task). +// Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { sp.ww.Destroy(ctx) } @@ -250,36 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { switch { case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + w.State().Set(worker.StateInvalid) + return nil, err case errors.Is(errors.SoftJob, err): - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // TODO suspicious logic, redesign - err = sp.ww.Allocate() - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) - } + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop w.State().Set(worker.StateInvalid) - err = w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) } - } else { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Release(w) + + return nil, err } - } - w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - // kill worker instead of stop, because worker here might be in the broken state (network) which leads us - // to the error - errS := w.Kill() - if errS != nil { - return nil, errors.E(op, err, errS) - } + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) - return nil, errors.E(op, err) + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, errors.E(op, err) + } } } diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go index e5b142b5..d90118c3 100644 --- a/pkg/state/job/state.go +++ b/pkg/state/job/state.go @@ -1,6 +1,17 @@ package job +// State represents job's state type State struct { - Queue string + // Pipeline name + Pipeline string + // Driver name + Driver string + // Queue name (tube for the beanstalk) + Queue string + // Active jobs which are consumed from the driver but not handled by the PHP worker yet Active int64 + // Delayed jobs + Delayed int64 + // Reserved jobs which are in the driver but not consumed yet + Reserved int64 } diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 51093978..7fb65a92 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) { // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: + // Stop Pop operations v.Lock() defer v.Unlock() @@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) { 2. Violated Get <-> Release operation (how ??) */ for i := uint64(0); i < v.len; i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states + // skip good states, put worker back case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO v.workers <- wrk continue + /* + Bad states are here. + */ default: // kill the current worker (just to be sure it's dead) - _ = wrk.Kill() - // replace with the new one + if wrk != nil { + _ = wrk.Kill() + } + // replace with the new one and return from the loop + // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker + // But this case will be handled in the worker_watcher::Get v.workers <- w return } @@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { } */ - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + if atomic.LoadUint64(&v.destroy) == 1 { return nil, errors.E(errors.WatcherStopped) } diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index 21fd7983..87180be5 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -1,20 +1,30 @@ package informer import ( + "context" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/logger" ) const PluginName = "informer" type Plugin struct { + log logger.Logger + + withJobs map[string]JobsStat withWorkers map[string]Informer available map[string]Availabler } -func (p *Plugin) Init() error { +func (p *Plugin) Init(log logger.Logger) error { p.available = make(map[string]Availabler) p.withWorkers = make(map[string]Informer) + p.withJobs = make(map[string]JobsStat) + + p.log = log return nil } @@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State { return svc.Workers() } +// Jobs provides information about jobs for the registered plugin using jobs +func (p *Plugin) Jobs(name string) []*job.State { + svc, ok := p.withJobs[name] + if !ok { + return nil + } + + st, err := svc.JobsState(context.Background()) + if err != nil { + p.log.Info("jobs stat", "error", err) + // skip errors here + return nil + } + + return st +} + // Collects declares services to be collected. func (p *Plugin) Collects() []interface{} { return []interface{}{ p.CollectPlugins, p.CollectWorkers, + p.CollectJobs, } } @@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) { p.withWorkers[name.Name()] = r } +func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) { + p.withJobs[name.Name()] = j +} + // Name of the service. func (p *Plugin) Name() string { return PluginName diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 3738b619..478d3227 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,6 +1,7 @@ package informer import ( + "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/pkg/state/process" ) @@ -10,7 +11,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { - // Workers is list of workers. + // Workers are list of workers. Workers []*process.State `json:"workers"` } @@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error { func (rpc *rpc) Workers(service string, list *WorkerList) error { workers := rpc.srv.Workers(service) if workers == nil { - list = nil return nil } @@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error { return nil } +func (rpc *rpc) Jobs(service string, out *[]*job.State) error { + *out = rpc.srv.Jobs(service) + return nil +} + // sort.Sort func (w *WorkerList) Len() int { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index b89cdc82..f3b40ae3 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -17,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -51,9 +52,8 @@ type JobConsumer struct { multipleAck bool requeueOnFail bool - delayCache map[string]struct{} - listeners uint32 + delayed *int64 stopCh chan struct{} } @@ -100,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh: make(chan struct{}), // TODO to config retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, @@ -170,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con consumeID: uuid.NewString(), stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), @@ -232,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -// handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - j.delayCache[tmpQ] = struct{}{} - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil @@ -375,9 +300,14 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ - Queue: q.Name, - Active: int64(q.Messages), + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: *j.delayed, }, nil case <-ctx.Done(): @@ -494,3 +424,80 @@ func (j *JobConsumer) Stop(context.Context) error { }) return nil } + +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 5990d137..623dcca7 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -3,6 +3,7 @@ package amqp import ( "context" "fmt" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -52,8 +53,8 @@ type Options struct { nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - + requeueFn func(context.Context, *Item) error + delayed *int64 multipleAsk bool requeue bool } @@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } // overwrite the delay i.Options.Delay = delay i.Headers = headers @@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack + item.Options.delayed = j.delayed // requeue func item.Options.requeueFn = j.handleItem diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 32ca4188..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, errN + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry put only when we redialed return cp.t.Put(body, pri, delay, ttr) @@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return 0, nil, errN + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Reserve only when we redialed return cp.ts.Reserve(reserveTimeout) @@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error return id, body, nil } -func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { cp.RLock() defer cp.RUnlock() @@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { - return errN + return errors.Errorf("err: %s\nerr redial: %s", err, errN) } else { // retry Delete only when we redialed return cp.conn.Delete(id) @@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 7e81e6d9..b4d76d38 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + // this is not an error, ready in terms of beanstalk means reserved in the tube + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { @@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index d801b7b4..34778642 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,7 +2,6 @@ package ephemeral import ( "context" - "sync" "sync/atomic" "time" @@ -14,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -29,14 +29,18 @@ type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler - pipeline sync.Map + pipeline atomic.Value pq priorityqueue.Queue localPrefetch chan *Item // time.sleep goroutines max number goroutines uint64 - stopCh chan struct{} + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } @@ -62,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // initialize a local queue jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - // consume from the queue - go jb.consume() - return jb, nil } @@ -74,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand pq: pq, eh: eh, goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), stopCh: make(chan struct{}, 1), } // initialize a local queue jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - // consume from the queue - go jb.consume() - return jb, nil } @@ -90,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - b, ok := j.pipeline.Load(jb.Options.Pipeline) + _, ok := j.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - err := j.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) @@ -107,53 +105,69 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil +func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(j.active), + Delayed: atomic.LoadInt64(j.delayed), + }, nil } func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - const op = errors.Op("ephemeral_register") - if _, ok := j.pipeline.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipeline.Store(pipeline.Name(), true) - + j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipeline.Store(pipeline, false) - } - // if not true - do not send the EventPipeStopped, because pipe already stopped +func (j *JobConsumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") return } + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop the consumer + j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ Event: events.EventPipePaused, - Pipeline: pipeline, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) } -func (j *JobConsumer) Resume(_ context.Context, pipeline string) { - if q, ok := j.pipeline.Load(pipeline); ok { - if q == false { - // mark pipeline as turned on - j.pipeline.Store(pipeline, true) - } +func (j *JobConsumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } - // if not true - do not send the EventPipeActive, because pipe already active + l := atomic.LoadUint32(&j.listeners) + // listener already active + if l == 1 { + j.log.Warn("listener already in the active state") return } + // resume the consumer on the same channel + j.consume() + + atomic.StoreUint32(&j.listeners, 1) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, - Pipeline: pipeline, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) @@ -172,22 +186,19 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { func (j *JobConsumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") - var pipe string - j.pipeline.Range(func(key, _ interface{}) bool { - pipe = key.(string) - j.pipeline.Delete(key) - return true - }) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) select { // return from the consumer case j.stopCh <- struct{}{}: j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, - Pipeline: pipe, + Pipeline: pipe.Name(), Start: time.Now(), Elapsed: 0, }) + return nil case <-ctx.Done(): @@ -208,6 +219,8 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { go func(jj *Item) { atomic.AddUint64(&j.goroutines, 1) + atomic.AddInt64(j.delayed, 1) + time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired @@ -219,6 +232,9 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return nil } + // increase number of the active jobs + atomic.AddInt64(j.active, 1) + // insert to the local, limited pipeline select { case j.localPrefetch <- msg: @@ -229,21 +245,25 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } func (j *JobConsumer) consume() { - // redirect - for { - select { - case item, ok := <-j.localPrefetch: - if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") + go func() { + // redirect + for { + select { + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = j.handleItem + item.Options.active = j.active + item.Options.delayed = j.delayed + + j.pq.Insert(item) + case <-j.stopCh: return } - - // set requeue channel - item.Options.requeueFn = j.handleItem - - j.pq.Insert(item) - case <-j.stopCh: - return } - } + }() } diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 1a61d7e9..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -2,6 +2,7 @@ package ephemeral import ( "context" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -40,6 +41,8 @@ type Options struct { // private requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } func (i *Item) Nack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } @@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { i.Options.Delay = delay i.Headers = headers + i.atomicallyReduceCount() + err := i.Options.requeueFn(context.Background(), i) if err != nil { return err @@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 4fb684f8..d957b3eb 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -2,6 +2,7 @@ package sqs import ( "context" + "strconv" "sync" "sync/atomic" "time" @@ -11,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" @@ -263,7 +265,44 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { } func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + } + + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -272,7 +311,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") + const op = errors.Op("sqs_run") j.Lock() defer j.Unlock() diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index c89877e3..e195c407 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,7 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. For example: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 7f9859fb..2750cd8f 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,6 +1,8 @@ package jobs import ( + "context" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -14,7 +16,7 @@ type rpc struct { } func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push") // convert transport entity into domain // how we can do this quickly @@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { } func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push_batch") l := len(j.GetJobs()) @@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { // 2. Pipeline name // 3. Options related to the particular pipeline func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") pipe := &pipeline.Pipeline{} for i := range req.GetPipeline() { @@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error } func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { @@ -112,6 +114,27 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err return nil } +func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { + const op = errors.Op("rpc_stats") + state, err := r.p.JobsState(context.Background()) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(state); i++ { + resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ + Pipeline: state[i].Pipeline, + Driver: state[i].Driver, + Queue: state[i].Queue, + Active: state[i].Active, + Delayed: state[i].Delayed, + Reserved: state[i].Reserved, + }) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ad371bf8..a7db0f83 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -118,7 +118,8 @@ func (p *Plugin) Serve() chan error { Supervisor: p.cfg.Pool.Supervisor, }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { - errCh <- err + errCh <- errors.E(op, err) + return } p.accessValidator = p.defaultAccessValidator(p.phpPool) @@ -135,7 +136,7 @@ func (p *Plugin) Serve() chan error { default: data, err := ps.Next() if err != nil { - errCh <- err + errCh <- errors.E(op, err) return } p.workersPool.Queue(data) diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index 6a6f59af..d83a401a 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -440,6 +440,141 @@ func (x *HeaderValue) GetValue() []string { return nil } +type Stats struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Stats []*Stat `protobuf:"bytes,1,rep,name=Stats,proto3" json:"Stats,omitempty"` +} + +func (x *Stats) Reset() { + *x = Stats{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Stats) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Stats) ProtoMessage() {} + +func (x *Stats) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Stats.ProtoReflect.Descriptor instead. +func (*Stats) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{8} +} + +func (x *Stats) GetStats() []*Stat { + if x != nil { + return x.Stats + } + return nil +} + +// Stats used as a response for the Stats RPC call +type Stat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pipeline string `protobuf:"bytes,1,opt,name=pipeline,proto3" json:"pipeline,omitempty"` + Driver string `protobuf:"bytes,2,opt,name=driver,proto3" json:"driver,omitempty"` + Queue string `protobuf:"bytes,3,opt,name=queue,proto3" json:"queue,omitempty"` + Active int64 `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"` + Delayed int64 `protobuf:"varint,5,opt,name=delayed,proto3" json:"delayed,omitempty"` + Reserved int64 `protobuf:"varint,6,opt,name=reserved,proto3" json:"reserved,omitempty"` +} + +func (x *Stat) Reset() { + *x = Stat{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Stat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Stat) ProtoMessage() {} + +func (x *Stat) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Stat.ProtoReflect.Descriptor instead. +func (*Stat) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{9} +} + +func (x *Stat) GetPipeline() string { + if x != nil { + return x.Pipeline + } + return "" +} + +func (x *Stat) GetDriver() string { + if x != nil { + return x.Driver + } + return "" +} + +func (x *Stat) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *Stat) GetActive() int64 { + if x != nil { + return x.Active + } + return 0 +} + +func (x *Stat) GetDelayed() int64 { + if x != nil { + return x.Delayed + } + return 0 +} + +func (x *Stat) GetReserved() int64 { + if x != nil { + return x.Reserved + } + return 0 +} + var File_jobs_proto protoreflect.FileDescriptor var file_jobs_proto_rawDesc = []byte{ @@ -488,8 +623,21 @@ var file_jobs_proto_rawDesc = []byte{ 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, - 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x65, 0x22, 0x30, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x27, 0x0a, 0x05, 0x53, + 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6a, 0x6f, 0x62, + 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x52, 0x05, 0x53, + 0x74, 0x61, 0x74, 0x73, 0x22, 0x9e, 0x01, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x1a, 0x0a, + 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x72, 0x69, + 0x76, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x72, 0x69, 0x76, 0x65, + 0x72, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, + 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x64, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, + 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -504,7 +652,7 @@ func file_jobs_proto_rawDescGZIP() []byte { return file_jobs_proto_rawDescData } -var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_jobs_proto_goTypes = []interface{}{ (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest @@ -514,21 +662,24 @@ var file_jobs_proto_goTypes = []interface{}{ (*Job)(nil), // 5: jobs.v1beta.Job (*Options)(nil), // 6: jobs.v1beta.Options (*HeaderValue)(nil), // 7: jobs.v1beta.HeaderValue - nil, // 8: jobs.v1beta.DeclareRequest.PipelineEntry - nil, // 9: jobs.v1beta.Job.HeadersEntry + (*Stats)(nil), // 8: jobs.v1beta.Stats + (*Stat)(nil), // 9: jobs.v1beta.Stat + nil, // 10: jobs.v1beta.DeclareRequest.PipelineEntry + nil, // 11: jobs.v1beta.Job.HeadersEntry } var file_jobs_proto_depIdxs = []int32{ - 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job - 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job - 8, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry - 9, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry - 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options - 7, // 5: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job + 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job + 10, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry + 11, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry + 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 9, // 5: jobs.v1beta.Stats.Stats:type_name -> jobs.v1beta.Stat + 7, // 6: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_jobs_proto_init() } @@ -633,6 +784,30 @@ func file_jobs_proto_init() { return nil } } + file_jobs_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Stats); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Stat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -640,7 +815,7 @@ func file_jobs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_jobs_proto_rawDesc, NumEnums: 0, - NumMessages: 10, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 68d2ed97..5d1f2dfe 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -43,3 +43,17 @@ message Options { message HeaderValue { repeated string value = 1; } + +message Stats { + repeated Stat Stats = 1; +} + +// Stats used as a response for the Stats RPC call +message Stat { + string pipeline = 1; + string driver = 2; + string queue = 3; + int64 active = 4; + int64 delayed = 5; + int64 reserved = 6; +} diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index c7041cc9..3dcc5c2c 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -279,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 5) @@ -291,6 +294,8 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { @@ -395,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) time.Sleep(time.Second * 4) @@ -404,6 +412,7 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index a268ebb8..9c314494 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -8,6 +8,7 @@ import ( "testing" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +19,7 @@ const ( pause string = "jobs.Pause" destroy string = "jobs.Destroy" resume string = "jobs.Resume" + stat string = "jobs.Stat" ) func resumePipes(pipes ...string) func(t *testing.T) { @@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) { er := &jobsv1beta.Empty{} err = client.Call(push, req, er) - assert.Error(t, err) + assert.NoError(t, err) } } @@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) { } } +func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: delay, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + func pushToPipeErr(pipeline string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") @@ -183,3 +209,23 @@ func deleteProxy(name string, t *testing.T) { _ = resp.Body.Close() } } + +func stats(t *testing.T, state *jobState.State) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + st := &jobsv1beta.Stats{} + er := &jobsv1beta.Empty{} + + err = client.Call(stat, er, st) + require.NoError(t, err) + require.NotNil(t, st) + + state.Queue = st.Stats[0].Queue + state.Pipeline = st.Stats[0].Pipeline + state.Driver = st.Stats[0].Driver + state.Active = st.Stats[0].Active + state.Delayed = st.Stats[0].Delayed + state.Reserved = st.Stats[0].Reserved +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 7096a467..4c1f600a 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -305,44 +306,64 @@ func TestAMQPJobsError(t *testing.T) { wg.Wait() } -func declareAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "amqp", - "name": "test-3", - "routing-key": "test-3", - "queue": "default", - "exchange-type": "direct", - "exchange": "amqp.default", - "prefetch": "100", - "priority": "3", - "exclusive": "true", - "multiple_ask": "true", - "requeue_on_fail": "true", - }} + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) } -func TestAMQPNoGlobalSection(t *testing.T) { +func TestAMQPStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "amqp/.rr-no-global.yaml", + Path: "amqp/.rr-amqp-declare.yaml", Prefix: "rr", } + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -355,6 +376,110 @@ func TestAMQPNoGlobalSection(t *testing.T) { t.Fatal(err) } - _, err = cont.Serve() - require.Error(t, err) + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + stats(t, out) + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) } diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index aebe9da1..2f56acab 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -312,6 +313,133 @@ func TestBeanstalkJobsError(t *testing.T) { wg.Wait() } +func TestBeanstalkStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBeanstalkPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 3) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) + + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(1), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func TestBeanstalkNoGlobalSection(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 0a882556..3f296c83 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -129,6 +130,8 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -201,8 +204,6 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() - - time.Sleep(time.Second * 5) } func TestEphemeralPauseResume(t *testing.T) { @@ -224,7 +225,9 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -290,14 +293,15 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) + t.Run("ephemeralResume", resumePipes("test-local")) t.Run("ephemeralPause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) t.Run("ephemeralResume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) stopCh <- struct{}{} + time.Sleep(time.Second) wg.Wait() } @@ -319,6 +323,8 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -394,6 +400,131 @@ func TestEphemeralJobsError(t *testing.T) { wg.Wait() } +func TestEphemeralStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + + time.Sleep(time.Second) + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(1)) + assert.Equal(t, out.Delayed, int64(1)) + assert.Equal(t, out.Reserved, int64(0)) + + time.Sleep(time.Second) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(0)) + assert.Equal(t, out.Delayed, int64(0)) + assert.Equal(t, out.Reserved, int64(0)) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareEphemeralPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index cb1f4486..e5a92916 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -342,6 +343,133 @@ func TestSQSNoGlobalSection(t *testing.T) { require.Error(t, err) } +func TestSQSStat(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareSQSPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) |