summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
committerValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
commitab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch)
tree0a58b043605ef1d9b09e75b207c236aacb1ed55a /plugins
parentbd0da830ae345e1ed4a67782bf413673beeba037 (diff)
Update to go 1.17
Add Stat with tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/informer/plugin.go34
-rw-r--r--plugins/informer/rpc.go9
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go169
-rw-r--r--plugins/jobs/drivers/amqp/item.go15
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go25
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go38
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go140
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go22
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
-rw-r--r--plugins/jobs/response_protocol.md2
-rw-r--r--plugins/jobs/rpc.go31
-rw-r--r--plugins/websockets/plugin.go5
12 files changed, 370 insertions, 163 deletions
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 21fd7983..87180be5 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -1,20 +1,30 @@
package informer
import (
+ "context"
+
endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/pkg/state/process"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "informer"
type Plugin struct {
+ log logger.Logger
+
+ withJobs map[string]JobsStat
withWorkers map[string]Informer
available map[string]Availabler
}
-func (p *Plugin) Init() error {
+func (p *Plugin) Init(log logger.Logger) error {
p.available = make(map[string]Availabler)
p.withWorkers = make(map[string]Informer)
+ p.withJobs = make(map[string]JobsStat)
+
+ p.log = log
return nil
}
@@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State {
return svc.Workers()
}
+// Jobs provides information about jobs for the registered plugin using jobs
+func (p *Plugin) Jobs(name string) []*job.State {
+ svc, ok := p.withJobs[name]
+ if !ok {
+ return nil
+ }
+
+ st, err := svc.JobsState(context.Background())
+ if err != nil {
+ p.log.Info("jobs stat", "error", err)
+ // skip errors here
+ return nil
+ }
+
+ return st
+}
+
// Collects declares services to be collected.
func (p *Plugin) Collects() []interface{} {
return []interface{}{
p.CollectPlugins,
p.CollectWorkers,
+ p.CollectJobs,
}
}
@@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) {
p.withWorkers[name.Name()] = r
}
+func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) {
+ p.withJobs[name.Name()] = j
+}
+
// Name of the service.
func (p *Plugin) Name() string {
return PluginName
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 3738b619..478d3227 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,6 +1,7 @@
package informer
import (
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/pkg/state/process"
)
@@ -10,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
- // Workers is list of workers.
+ // Workers are list of workers.
Workers []*process.State `json:"workers"`
}
@@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error {
func (rpc *rpc) Workers(service string, list *WorkerList) error {
workers := rpc.srv.Workers(service)
if workers == nil {
- list = nil
return nil
}
@@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+func (rpc *rpc) Jobs(service string, out *[]*job.State) error {
+ *out = rpc.srv.Jobs(service)
+ return nil
+}
+
// sort.Sort
func (w *WorkerList) Len() int {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index b89cdc82..f3b40ae3 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -17,6 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -51,9 +52,8 @@ type JobConsumer struct {
multipleAck bool
requeueOnFail bool
- delayCache map[string]struct{}
-
listeners uint32
+ delayed *int64
stopCh chan struct{}
}
@@ -100,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
stopCh: make(chan struct{}),
// TODO to config
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
@@ -170,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
consumeID: uuid.NewString(),
stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
@@ -232,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- j.delayCache[tmpQ] = struct{}{}
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
@@ -375,9 +300,14 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
return &jobState.State{
- Queue: q.Name,
- Active: int64(q.Messages),
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: *j.delayed,
}, nil
case <-ctx.Done():
@@ -494,3 +424,80 @@ func (j *JobConsumer) Stop(context.Context) error {
})
return nil
}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(j.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 5990d137..623dcca7 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -3,6 +3,7 @@ package amqp
import (
"context"
"fmt"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -52,8 +53,8 @@ type Options struct {
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
-
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
multipleAsk bool
requeue bool
}
@@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
@@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
+ item.Options.delayed = j.delayed
// requeue func
item.Options.requeueFn = j.handleItem
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 32ca4188..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, errN
+ return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry put only when we redialed
return cp.t.Put(body, pri, delay, ttr)
@@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, nil, errN
+ return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Reserve only when we redialed
return cp.ts.Reserve(reserveTimeout)
@@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
return id, body, nil
}
-func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
+func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()
@@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return errN
+ return errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Delete only when we redialed
return cp.conn.Delete(id)
@@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
+func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ stat, err := cp.conn.Stats()
+ if err != nil {
+ errR := cp.checkAndRedial(err)
+ if errR != nil {
+ return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
+ } else {
+ return cp.conn.Stats()
+ }
+ }
+
+ return stat, nil
+}
+
func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 7e81e6d9..b4d76d38 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ // this is not an error, ready in terms of beanstalk means reserved in the tube
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
@@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index d801b7b4..34778642 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,7 +2,6 @@ package ephemeral
import (
"context"
- "sync"
"sync/atomic"
"time"
@@ -14,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -29,14 +29,18 @@ type JobConsumer struct {
cfg *Config
log logger.Logger
eh events.Handler
- pipeline sync.Map
+ pipeline atomic.Value
pq priorityqueue.Queue
localPrefetch chan *Item
// time.sleep goroutines max number
goroutines uint64
- stopCh chan struct{}
+ delayed *int64
+ active *int64
+
+ listeners uint32
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -47,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
@@ -62,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// initialize a local queue
jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -74,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
// initialize a local queue
jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -90,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ _, ok := j.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
err := j.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
@@ -107,53 +105,69 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: pipe.Name(),
+ Active: atomic.LoadInt64(j.active),
+ Delayed: atomic.LoadInt64(j.delayed),
+ }, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- const op = errors.Op("ephemeral_register")
- if _, ok := j.pipeline.Load(pipeline.Name()); ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
- }
-
- j.pipeline.Store(pipeline.Name(), true)
-
+ j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == true {
- // mark pipeline as turned off
- j.pipeline.Store(pipeline, false)
- }
- // if not true - do not send the EventPipeStopped, because pipe already stopped
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
return
}
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop the consumer
+ j.stopCh <- struct{}{}
+
j.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
- Pipeline: pipeline,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
}
-func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == false {
- // mark pipeline as turned on
- j.pipeline.Store(pipeline, true)
- }
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
- // if not true - do not send the EventPipeActive, because pipe already active
+ l := atomic.LoadUint32(&j.listeners)
+ // listener already active
+ if l == 1 {
+ j.log.Warn("listener already in the active state")
return
}
+ // resume the consumer on the same channel
+ j.consume()
+
+ atomic.StoreUint32(&j.listeners, 1)
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
- Pipeline: pipeline,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
@@ -172,22 +186,19 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
func (j *JobConsumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
- var pipe string
- j.pipeline.Range(func(key, _ interface{}) bool {
- pipe = key.(string)
- j.pipeline.Delete(key)
- return true
- })
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
select {
// return from the consumer
case j.stopCh <- struct{}{}:
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
- Pipeline: pipe,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
+
return nil
case <-ctx.Done():
@@ -208,6 +219,8 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
go func(jj *Item) {
atomic.AddUint64(&j.goroutines, 1)
+ atomic.AddInt64(j.delayed, 1)
+
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
@@ -219,6 +232,9 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
return nil
}
+ // increase number of the active jobs
+ atomic.AddInt64(j.active, 1)
+
// insert to the local, limited pipeline
select {
case j.localPrefetch <- msg:
@@ -229,21 +245,25 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ go func() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+ item.Options.active = j.active
+ item.Options.delayed = j.delayed
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
return
}
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
}
- }
+ }()
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 1a61d7e9..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"context"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -40,6 +41,8 @@ type Options struct {
// private
requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
func (i *Item) Nack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
@@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
i.Options.Delay = delay
i.Headers = headers
+ i.atomicallyReduceCount()
+
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
@@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 4fb684f8..d957b3eb 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
@@ -263,7 +265,44 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
}
func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ }
+
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -272,7 +311,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index c89877e3..e195c407 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,7 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
with string key and array of strings as a value.
For example:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 7f9859fb..2750cd8f 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "context"
+
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -14,7 +16,7 @@ type rpc struct {
}
func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push")
// convert transport entity into domain
// how we can do this quickly
@@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
}
func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push_batch")
l := len(j.GetJobs())
@@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
// 2. Pipeline name
// 3. Options related to the particular pipeline
func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
pipe := &pipeline.Pipeline{}
for i := range req.GetPipeline() {
@@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
}
func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
var destroyed []string //nolint:prealloc
for i := 0; i < len(req.GetPipelines()); i++ {
@@ -112,6 +114,27 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err
return nil
}
+func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
+ const op = errors.Op("rpc_stats")
+ state, err := r.p.JobsState(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(state); i++ {
+ resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
+ Pipeline: state[i].Pipeline,
+ Driver: state[i].Driver,
+ Queue: state[i].Queue,
+ Active: state[i].Active,
+ Delayed: state[i].Delayed,
+ Reserved: state[i].Reserved,
+ })
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ad371bf8..a7db0f83 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -118,7 +118,8 @@ func (p *Plugin) Serve() chan error {
Supervisor: p.cfg.Pool.Supervisor,
}, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
+ return
}
p.accessValidator = p.defaultAccessValidator(p.phpPool)
@@ -135,7 +136,7 @@ func (p *Plugin) Serve() chan error {
default:
data, err := ps.Next()
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
return
}
p.workersPool.Queue(data)