summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linux.yml2
-rw-r--r--go.mod50
-rwxr-xr-xpkg/pool/static_pool.go58
-rw-r--r--pkg/state/job/state.go13
-rw-r--r--pkg/worker_watcher/container/channel/vec.go19
-rw-r--r--plugins/informer/plugin.go34
-rw-r--r--plugins/informer/rpc.go9
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go169
-rw-r--r--plugins/jobs/drivers/amqp/item.go15
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go25
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go38
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go140
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go22
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
-rw-r--r--plugins/jobs/response_protocol.md2
-rw-r--r--plugins/jobs/rpc.go31
-rw-r--r--plugins/websockets/plugin.go5
-rw-r--r--proto/jobs/v1beta/jobs.pb.go209
-rw-r--r--proto/jobs/v1beta/jobs.proto14
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go9
-rw-r--r--tests/plugins/jobs/helpers.go48
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go171
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go128
-rw-r--r--tests/plugins/jobs/jobs_ephemeral_test.go139
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go128
25 files changed, 1283 insertions, 238 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index fcf2c611..0224de69 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -23,7 +23,7 @@ jobs:
fail-fast: true
matrix:
php: [ "7.4", "8.0" ]
- go: [ "1.16" ]
+ go: [ "1.17" ]
os: [ 'ubuntu-latest' ]
steps:
- name: Set up Go ${{ matrix.go }}
diff --git a/go.mod b/go.mod
index 49731f8e..79cd6bc8 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
module github.com/spiral/roadrunner/v2
-go 1.16
+go 1.17
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
@@ -46,3 +46,51 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
+
+require (
+ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
+ github.com/andybalholm/brotli v1.0.2 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.0 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/ini v1.2.0 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.3.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.1 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/fsnotify/fsnotify v1.4.9 // indirect
+ github.com/go-ole/go-ole v1.2.5 // indirect
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/hashicorp/hcl v1.0.0 // indirect
+ github.com/magiconair/properties v1.8.5 // indirect
+ github.com/mattn/go-colorable v0.1.8 // indirect
+ github.com/mattn/go-isatty v0.0.12 // indirect
+ github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+ github.com/mitchellh/mapstructure v1.4.1 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
+ github.com/pelletier/go-toml v1.9.3 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/prometheus/client_model v0.2.0 // indirect
+ github.com/prometheus/common v0.26.0 // indirect
+ github.com/prometheus/procfs v0.6.0 // indirect
+ github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c // indirect
+ github.com/spf13/afero v1.6.0 // indirect
+ github.com/spf13/cast v1.3.1 // indirect
+ github.com/spf13/jwalterweatherman v1.1.0 // indirect
+ github.com/spf13/pflag v1.0.5 // indirect
+ github.com/subosito/gotenv v1.2.0 // indirect
+ github.com/tklauser/numcpus v0.2.3 // indirect
+ github.com/valyala/bytebufferpool v1.0.0 // indirect
+ github.com/valyala/fasthttp v1.26.0 // indirect
+ github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect
+ github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
+ go.uber.org/atomic v1.7.0 // indirect
+ golang.org/x/text v0.3.6 // indirect
+ golang.org/x/tools v0.1.2 // indirect
+ google.golang.org/appengine v1.6.7 // indirect
+ gopkg.in/ini.v1 v1.62.0 // indirect
+ gopkg.in/yaml.v2 v2.4.0 // indirect
+ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
+)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index b20e4242..3eb0714f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them to complete the task).
+// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
}
@@ -250,36 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
switch {
case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
case errors.Is(errors.SoftJob, err):
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // TODO suspicious logic, redesign
- err = sp.ww.Allocate()
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
- }
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
w.State().Set(worker.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- } else {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Release(w)
+
+ return nil, err
}
- }
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- // kill worker instead of stop, because worker here might be in the broken state (network) which leads us
- // to the error
- errS := w.Kill()
- if errS != nil {
- return nil, errors.E(op, err, errS)
- }
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
- return nil, errors.E(op, err)
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
}
}
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
index e5b142b5..d90118c3 100644
--- a/pkg/state/job/state.go
+++ b/pkg/state/job/state.go
@@ -1,6 +1,17 @@
package job
+// State represents job's state
type State struct {
- Queue string
+ // Pipeline name
+ Pipeline string
+ // Driver name
+ Driver string
+ // Queue name (tube for the beanstalk)
+ Queue string
+ // Active jobs which are consumed from the driver but not handled by the PHP worker yet
Active int64
+ // Delayed jobs
+ Delayed int64
+ // Reserved jobs which are in the driver but not consumed yet
+ Reserved int64
}
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 51093978..7fb65a92 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
+ // Stop Pop operations
v.Lock()
defer v.Unlock()
@@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) {
2. Violated Get <-> Release operation (how ??)
*/
for i := uint64(0); i < v.len; i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states
+ // skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
+ /*
+ Bad states are here.
+ */
default:
// kill the current worker (just to be sure it's dead)
- _ = wrk.Kill()
- // replace with the new one
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
@@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
}
*/
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 21fd7983..87180be5 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -1,20 +1,30 @@
package informer
import (
+ "context"
+
endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/pkg/state/process"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "informer"
type Plugin struct {
+ log logger.Logger
+
+ withJobs map[string]JobsStat
withWorkers map[string]Informer
available map[string]Availabler
}
-func (p *Plugin) Init() error {
+func (p *Plugin) Init(log logger.Logger) error {
p.available = make(map[string]Availabler)
p.withWorkers = make(map[string]Informer)
+ p.withJobs = make(map[string]JobsStat)
+
+ p.log = log
return nil
}
@@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State {
return svc.Workers()
}
+// Jobs provides information about jobs for the registered plugin using jobs
+func (p *Plugin) Jobs(name string) []*job.State {
+ svc, ok := p.withJobs[name]
+ if !ok {
+ return nil
+ }
+
+ st, err := svc.JobsState(context.Background())
+ if err != nil {
+ p.log.Info("jobs stat", "error", err)
+ // skip errors here
+ return nil
+ }
+
+ return st
+}
+
// Collects declares services to be collected.
func (p *Plugin) Collects() []interface{} {
return []interface{}{
p.CollectPlugins,
p.CollectWorkers,
+ p.CollectJobs,
}
}
@@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) {
p.withWorkers[name.Name()] = r
}
+func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) {
+ p.withJobs[name.Name()] = j
+}
+
// Name of the service.
func (p *Plugin) Name() string {
return PluginName
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 3738b619..478d3227 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,6 +1,7 @@
package informer
import (
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/pkg/state/process"
)
@@ -10,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
- // Workers is list of workers.
+ // Workers are list of workers.
Workers []*process.State `json:"workers"`
}
@@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error {
func (rpc *rpc) Workers(service string, list *WorkerList) error {
workers := rpc.srv.Workers(service)
if workers == nil {
- list = nil
return nil
}
@@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+func (rpc *rpc) Jobs(service string, out *[]*job.State) error {
+ *out = rpc.srv.Jobs(service)
+ return nil
+}
+
// sort.Sort
func (w *WorkerList) Len() int {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index b89cdc82..f3b40ae3 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -17,6 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -51,9 +52,8 @@ type JobConsumer struct {
multipleAck bool
requeueOnFail bool
- delayCache map[string]struct{}
-
listeners uint32
+ delayed *int64
stopCh chan struct{}
}
@@ -100,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
stopCh: make(chan struct{}),
// TODO to config
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
@@ -170,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
consumeID: uuid.NewString(),
stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
@@ -232,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- j.delayCache[tmpQ] = struct{}{}
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
@@ -375,9 +300,14 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
return &jobState.State{
- Queue: q.Name,
- Active: int64(q.Messages),
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: *j.delayed,
}, nil
case <-ctx.Done():
@@ -494,3 +424,80 @@ func (j *JobConsumer) Stop(context.Context) error {
})
return nil
}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(j.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 5990d137..623dcca7 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -3,6 +3,7 @@ package amqp
import (
"context"
"fmt"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -52,8 +53,8 @@ type Options struct {
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
-
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
multipleAsk bool
requeue bool
}
@@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
@@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
+ item.Options.delayed = j.delayed
// requeue func
item.Options.requeueFn = j.handleItem
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 32ca4188..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, errN
+ return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry put only when we redialed
return cp.t.Put(body, pri, delay, ttr)
@@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, nil, errN
+ return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Reserve only when we redialed
return cp.ts.Reserve(reserveTimeout)
@@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
return id, body, nil
}
-func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
+func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()
@@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return errN
+ return errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Delete only when we redialed
return cp.conn.Delete(id)
@@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
+func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ stat, err := cp.conn.Stats()
+ if err != nil {
+ errR := cp.checkAndRedial(err)
+ if errR != nil {
+ return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
+ } else {
+ return cp.conn.Stats()
+ }
+ }
+
+ return stat, nil
+}
+
func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 7e81e6d9..b4d76d38 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ // this is not an error, ready in terms of beanstalk means reserved in the tube
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
@@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index d801b7b4..34778642 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,7 +2,6 @@ package ephemeral
import (
"context"
- "sync"
"sync/atomic"
"time"
@@ -14,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -29,14 +29,18 @@ type JobConsumer struct {
cfg *Config
log logger.Logger
eh events.Handler
- pipeline sync.Map
+ pipeline atomic.Value
pq priorityqueue.Queue
localPrefetch chan *Item
// time.sleep goroutines max number
goroutines uint64
- stopCh chan struct{}
+ delayed *int64
+ active *int64
+
+ listeners uint32
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -47,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
@@ -62,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// initialize a local queue
jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -74,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
// initialize a local queue
jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -90,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ _, ok := j.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
err := j.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
@@ -107,53 +105,69 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: pipe.Name(),
+ Active: atomic.LoadInt64(j.active),
+ Delayed: atomic.LoadInt64(j.delayed),
+ }, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- const op = errors.Op("ephemeral_register")
- if _, ok := j.pipeline.Load(pipeline.Name()); ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
- }
-
- j.pipeline.Store(pipeline.Name(), true)
-
+ j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == true {
- // mark pipeline as turned off
- j.pipeline.Store(pipeline, false)
- }
- // if not true - do not send the EventPipeStopped, because pipe already stopped
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
return
}
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop the consumer
+ j.stopCh <- struct{}{}
+
j.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
- Pipeline: pipeline,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
}
-func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == false {
- // mark pipeline as turned on
- j.pipeline.Store(pipeline, true)
- }
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
- // if not true - do not send the EventPipeActive, because pipe already active
+ l := atomic.LoadUint32(&j.listeners)
+ // listener already active
+ if l == 1 {
+ j.log.Warn("listener already in the active state")
return
}
+ // resume the consumer on the same channel
+ j.consume()
+
+ atomic.StoreUint32(&j.listeners, 1)
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
- Pipeline: pipeline,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
@@ -172,22 +186,19 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
func (j *JobConsumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
- var pipe string
- j.pipeline.Range(func(key, _ interface{}) bool {
- pipe = key.(string)
- j.pipeline.Delete(key)
- return true
- })
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
select {
// return from the consumer
case j.stopCh <- struct{}{}:
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
- Pipeline: pipe,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
+
return nil
case <-ctx.Done():
@@ -208,6 +219,8 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
go func(jj *Item) {
atomic.AddUint64(&j.goroutines, 1)
+ atomic.AddInt64(j.delayed, 1)
+
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
@@ -219,6 +232,9 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
return nil
}
+ // increase number of the active jobs
+ atomic.AddInt64(j.active, 1)
+
// insert to the local, limited pipeline
select {
case j.localPrefetch <- msg:
@@ -229,21 +245,25 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ go func() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+ item.Options.active = j.active
+ item.Options.delayed = j.delayed
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
return
}
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
}
- }
+ }()
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 1a61d7e9..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"context"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -40,6 +41,8 @@ type Options struct {
// private
requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
func (i *Item) Nack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
@@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
i.Options.Delay = delay
i.Headers = headers
+ i.atomicallyReduceCount()
+
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
@@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 4fb684f8..d957b3eb 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
@@ -263,7 +265,44 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
}
func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ }
+
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -272,7 +311,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index c89877e3..e195c407 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,7 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
with string key and array of strings as a value.
For example:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 7f9859fb..2750cd8f 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "context"
+
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -14,7 +16,7 @@ type rpc struct {
}
func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push")
// convert transport entity into domain
// how we can do this quickly
@@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
}
func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push_batch")
l := len(j.GetJobs())
@@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
// 2. Pipeline name
// 3. Options related to the particular pipeline
func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
pipe := &pipeline.Pipeline{}
for i := range req.GetPipeline() {
@@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
}
func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
var destroyed []string //nolint:prealloc
for i := 0; i < len(req.GetPipelines()); i++ {
@@ -112,6 +114,27 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err
return nil
}
+func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
+ const op = errors.Op("rpc_stats")
+ state, err := r.p.JobsState(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(state); i++ {
+ resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
+ Pipeline: state[i].Pipeline,
+ Driver: state[i].Driver,
+ Queue: state[i].Queue,
+ Active: state[i].Active,
+ Delayed: state[i].Delayed,
+ Reserved: state[i].Reserved,
+ })
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ad371bf8..a7db0f83 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -118,7 +118,8 @@ func (p *Plugin) Serve() chan error {
Supervisor: p.cfg.Pool.Supervisor,
}, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
+ return
}
p.accessValidator = p.defaultAccessValidator(p.phpPool)
@@ -135,7 +136,7 @@ func (p *Plugin) Serve() chan error {
default:
data, err := ps.Next()
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
return
}
p.workersPool.Queue(data)
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index 6a6f59af..d83a401a 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -440,6 +440,141 @@ func (x *HeaderValue) GetValue() []string {
return nil
}
+type Stats struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Stats []*Stat `protobuf:"bytes,1,rep,name=Stats,proto3" json:"Stats,omitempty"`
+}
+
+func (x *Stats) Reset() {
+ *x = Stats{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[8]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Stats) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Stats) ProtoMessage() {}
+
+func (x *Stats) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[8]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Stats.ProtoReflect.Descriptor instead.
+func (*Stats) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{8}
+}
+
+func (x *Stats) GetStats() []*Stat {
+ if x != nil {
+ return x.Stats
+ }
+ return nil
+}
+
+// Stats used as a response for the Stats RPC call
+type Stat struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Pipeline string `protobuf:"bytes,1,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
+ Driver string `protobuf:"bytes,2,opt,name=driver,proto3" json:"driver,omitempty"`
+ Queue string `protobuf:"bytes,3,opt,name=queue,proto3" json:"queue,omitempty"`
+ Active int64 `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"`
+ Delayed int64 `protobuf:"varint,5,opt,name=delayed,proto3" json:"delayed,omitempty"`
+ Reserved int64 `protobuf:"varint,6,opt,name=reserved,proto3" json:"reserved,omitempty"`
+}
+
+func (x *Stat) Reset() {
+ *x = Stat{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[9]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Stat) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Stat) ProtoMessage() {}
+
+func (x *Stat) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[9]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Stat.ProtoReflect.Descriptor instead.
+func (*Stat) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{9}
+}
+
+func (x *Stat) GetPipeline() string {
+ if x != nil {
+ return x.Pipeline
+ }
+ return ""
+}
+
+func (x *Stat) GetDriver() string {
+ if x != nil {
+ return x.Driver
+ }
+ return ""
+}
+
+func (x *Stat) GetQueue() string {
+ if x != nil {
+ return x.Queue
+ }
+ return ""
+}
+
+func (x *Stat) GetActive() int64 {
+ if x != nil {
+ return x.Active
+ }
+ return 0
+}
+
+func (x *Stat) GetDelayed() int64 {
+ if x != nil {
+ return x.Delayed
+ }
+ return 0
+}
+
+func (x *Stat) GetReserved() int64 {
+ if x != nil {
+ return x.Reserved
+ }
+ return 0
+}
+
var File_jobs_proto protoreflect.FileDescriptor
var file_jobs_proto_rawDesc = []byte{
@@ -488,8 +623,21 @@ var file_jobs_proto_rawDesc = []byte{
0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x22, 0x23, 0x0a,
0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62,
- 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x75, 0x65, 0x22, 0x30, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x27, 0x0a, 0x05, 0x53,
+ 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6a, 0x6f, 0x62,
+ 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x52, 0x05, 0x53,
+ 0x74, 0x61, 0x74, 0x73, 0x22, 0x9e, 0x01, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x1a, 0x0a,
+ 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x72, 0x69,
+ 0x76, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x72, 0x69, 0x76, 0x65,
+ 0x72, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76,
+ 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12,
+ 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x64, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73,
+ 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -504,7 +652,7 @@ func file_jobs_proto_rawDescGZIP() []byte {
return file_jobs_proto_rawDescData
}
-var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_jobs_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
(*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
@@ -514,21 +662,24 @@ var file_jobs_proto_goTypes = []interface{}{
(*Job)(nil), // 5: jobs.v1beta.Job
(*Options)(nil), // 6: jobs.v1beta.Options
(*HeaderValue)(nil), // 7: jobs.v1beta.HeaderValue
- nil, // 8: jobs.v1beta.DeclareRequest.PipelineEntry
- nil, // 9: jobs.v1beta.Job.HeadersEntry
+ (*Stats)(nil), // 8: jobs.v1beta.Stats
+ (*Stat)(nil), // 9: jobs.v1beta.Stat
+ nil, // 10: jobs.v1beta.DeclareRequest.PipelineEntry
+ nil, // 11: jobs.v1beta.Job.HeadersEntry
}
var file_jobs_proto_depIdxs = []int32{
- 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
- 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
- 8, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry
- 9, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
- 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
- 7, // 5: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
- 6, // [6:6] is the sub-list for method output_type
- 6, // [6:6] is the sub-list for method input_type
- 6, // [6:6] is the sub-list for extension type_name
- 6, // [6:6] is the sub-list for extension extendee
- 0, // [0:6] is the sub-list for field type_name
+ 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
+ 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
+ 10, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry
+ 11, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
+ 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
+ 9, // 5: jobs.v1beta.Stats.Stats:type_name -> jobs.v1beta.Stat
+ 7, // 6: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
+ 7, // [7:7] is the sub-list for method output_type
+ 7, // [7:7] is the sub-list for method input_type
+ 7, // [7:7] is the sub-list for extension type_name
+ 7, // [7:7] is the sub-list for extension extendee
+ 0, // [0:7] is the sub-list for field type_name
}
func init() { file_jobs_proto_init() }
@@ -633,6 +784,30 @@ func file_jobs_proto_init() {
return nil
}
}
+ file_jobs_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Stats); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Stat); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -640,7 +815,7 @@ func file_jobs_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_jobs_proto_rawDesc,
NumEnums: 0,
- NumMessages: 10,
+ NumMessages: 12,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 68d2ed97..5d1f2dfe 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -43,3 +43,17 @@ message Options {
message HeaderValue {
repeated string value = 1;
}
+
+message Stats {
+ repeated Stat Stats = 1;
+}
+
+// Stats used as a response for the Stats RPC call
+message Stat {
+ string pipeline = 1;
+ string driver = 2;
+ string queue = 3;
+ int64 active = 4;
+ int64 delayed = 5;
+ int64 reserved = 6;
+}
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index c7041cc9..3dcc5c2c 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -279,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
time.Sleep(time.Second * 5)
@@ -291,6 +294,8 @@ func TestBroadcastSameSubscriber(t *testing.T) {
t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
@@ -395,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
time.Sleep(time.Second * 4)
@@ -404,6 +412,7 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
time.Sleep(time.Second * 5)
t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go
index a268ebb8..9c314494 100644
--- a/tests/plugins/jobs/helpers.go
+++ b/tests/plugins/jobs/helpers.go
@@ -8,6 +8,7 @@ import (
"testing"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -18,6 +19,7 @@ const (
pause string = "jobs.Pause"
destroy string = "jobs.Destroy"
resume string = "jobs.Resume"
+ stat string = "jobs.Stat"
)
func resumePipes(pipes ...string) func(t *testing.T) {
@@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) {
er := &jobsv1beta.Empty{}
err = client.Call(push, req, er)
- assert.Error(t, err)
+ assert.NoError(t, err)
}
}
@@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) {
}
}
+func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
+ Job: "some/php/namespace",
+ Id: "1",
+ Payload: `{"hello":"world"}`,
+ Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
+ Options: &jobsv1beta.Options{
+ Priority: 1,
+ Pipeline: pipeline,
+ Delay: delay,
+ },
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call(push, req, er)
+ assert.NoError(t, err)
+ }
+}
+
func pushToPipeErr(pipeline string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
@@ -183,3 +209,23 @@ func deleteProxy(name string, t *testing.T) {
_ = resp.Body.Close()
}
}
+
+func stats(t *testing.T, state *jobState.State) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ st := &jobsv1beta.Stats{}
+ er := &jobsv1beta.Empty{}
+
+ err = client.Call(stat, er, st)
+ require.NoError(t, err)
+ require.NotNil(t, st)
+
+ state.Queue = st.Stats[0].Queue
+ state.Pipeline = st.Stats[0].Pipeline
+ state.Driver = st.Stats[0].Driver
+ state.Active = st.Stats[0].Active
+ state.Delayed = st.Stats[0].Delayed
+ state.Reserved = st.Stats[0].Reserved
+}
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
index 7096a467..4c1f600a 100644
--- a/tests/plugins/jobs/jobs_amqp_test.go
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -305,44 +306,64 @@ func TestAMQPJobsError(t *testing.T) {
wg.Wait()
}
-func declareAMQPPipe(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
+func TestAMQPNoGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "amqp",
- "name": "test-3",
- "routing-key": "test-3",
- "queue": "default",
- "exchange-type": "direct",
- "exchange": "amqp.default",
- "prefetch": "100",
- "priority": "3",
- "exclusive": "true",
- "multiple_ask": "true",
- "requeue_on_fail": "true",
- }}
+ cfg := &config.Viper{
+ Path: "amqp/.rr-no-global.yaml",
+ Prefix: "rr",
+ }
- er := &jobsv1beta.Empty{}
- err = client.Call("jobs.Declare", pipe, er)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
}
-func TestAMQPNoGlobalSection(t *testing.T) {
+func TestAMQPStats(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "amqp/.rr-no-global.yaml",
+ Path: "amqp/.rr-amqp-declare.yaml",
Prefix: "rr",
}
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes()
+
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -355,6 +376,110 @@ func TestAMQPNoGlobalSection(t *testing.T) {
t.Fatal(err)
}
- _, err = cont.Serve()
- require.Error(t, err)
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareAMQPPipeline", declareAMQPPipe)
+ t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
+
+ out := &jobState.State{}
+ stats(t, out)
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "amqp")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "amqp")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func declareAMQPPipe(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
+ "driver": "amqp",
+ "name": "test-3",
+ "routing-key": "test-3",
+ "queue": "default",
+ "exchange-type": "direct",
+ "exchange": "amqp.default",
+ "prefetch": "100",
+ "priority": "3",
+ "exclusive": "true",
+ "multiple_ask": "true",
+ "requeue_on_fail": "true",
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Declare", pipe, er)
+ assert.NoError(t, err)
}
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index aebe9da1..2f56acab 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -312,6 +313,133 @@ func TestBeanstalkJobsError(t *testing.T) {
wg.Wait()
}
+func TestBeanstalkStats(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-beanstalk-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclarePipeline", declareBeanstalkPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PausePipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second * 3)
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+
+ out := &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "beanstalk")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(1), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "beanstalk")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func TestBeanstalkNoGlobalSection(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go
index 0a882556..3f296c83 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_ephemeral_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -129,6 +130,8 @@ func TestEphemeralDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -201,8 +204,6 @@ func TestEphemeralDeclare(t *testing.T) {
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
-
- time.Sleep(time.Second * 5)
}
func TestEphemeralPauseResume(t *testing.T) {
@@ -224,7 +225,9 @@ func TestEphemeralPauseResume(t *testing.T) {
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
+
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -290,14 +293,15 @@ func TestEphemeralPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
+ t.Run("ephemeralResume", resumePipes("test-local"))
t.Run("ephemeralPause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
t.Run("ephemeralResume", resumePipes("test-local"))
t.Run("pushToEnabledPipe", pushToPipe("test-local"))
-
time.Sleep(time.Second * 1)
stopCh <- struct{}{}
+ time.Sleep(time.Second)
wg.Wait()
}
@@ -319,6 +323,8 @@ func TestEphemeralJobsError(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -394,6 +400,131 @@ func TestEphemeralJobsError(t *testing.T) {
wg.Wait()
}
+func TestEphemeralStats(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+
+ t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+
+ time.Sleep(time.Second)
+ out := &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Queue, "test-3")
+
+ assert.Equal(t, out.Active, int64(1))
+ assert.Equal(t, out.Delayed, int64(1))
+ assert.Equal(t, out.Reserved, int64(0))
+
+ time.Sleep(time.Second)
+ t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Queue, "test-3")
+
+ assert.Equal(t, out.Active, int64(0))
+ assert.Equal(t, out.Delayed, int64(0))
+ assert.Equal(t, out.Reserved, int64(0))
+
+ t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func declareEphemeralPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index cb1f4486..e5a92916 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -342,6 +343,133 @@ func TestSQSNoGlobalSection(t *testing.T) {
require.Error(t, err)
}
+func TestSQSStat(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "sqs/.rr-sqs-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &sqs.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareSQSPipeline", declareSQSPipe)
+ t.Run("ConsumeSQSPipeline", resumePipes("test-3"))
+ t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseSQSPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+
+ t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5))
+ t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+
+ out := &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "sqs")
+ assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default")
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ stats(t, out)
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "sqs")
+ assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func declareSQSPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)