summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/gzip/plugin.go1
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--plugins/informer/interface.go19
-rw-r--r--plugins/informer/plugin.go36
-rw-r--r--plugins/informer/rpc.go11
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go194
-rw-r--r--plugins/jobs/drivers/amqp/item.go15
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go25
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go47
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go220
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go22
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go65
-rw-r--r--plugins/jobs/metrics.go91
-rw-r--r--plugins/jobs/plugin.go134
-rw-r--r--plugins/jobs/response_protocol.md2
-rw-r--r--plugins/jobs/rpc.go32
-rw-r--r--plugins/memory/pubsub.go34
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go34
-rw-r--r--plugins/redis/kv/kv.go (renamed from plugins/redis/kv.go)2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub/channel.go (renamed from plugins/redis/channel.go)6
-rw-r--r--plugins/redis/pubsub/config.go34
-rw-r--r--plugins/redis/pubsub/pubsub.go (renamed from plugins/redis/pubsub.go)12
-rw-r--r--plugins/reload/plugin.go16
-rw-r--r--plugins/resetter/interface.go16
-rw-r--r--plugins/resetter/plugin.go43
-rw-r--r--plugins/resetter/rpc.go17
-rw-r--r--plugins/server/command.go2
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--plugins/service/plugin.go2
-rw-r--r--plugins/websockets/plugin.go31
34 files changed, 890 insertions, 319 deletions
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go
index a957878c..05f1eb63 100644
--- a/plugins/gzip/plugin.go
+++ b/plugins/gzip/plugin.go
@@ -10,7 +10,6 @@ const PluginName = "gzip"
type Plugin struct{}
-// Init needed for the Endure
func (g *Plugin) Init() error {
return nil
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2ee83384..dc887f87 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -10,7 +10,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index bbc1a048..9277b85b 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,7 +1,10 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
/*
@@ -9,17 +12,23 @@ Informer plugin should not receive any other plugin in the Init or via Collects
Because Availabler implementation should present in every plugin
*/
+// Statistic interfaces ==============
+
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
Workers() []*process.State
}
+// JobsStat interface provide statistic for the jobs plugin
+type JobsStat interface {
+ // JobsState returns slice with the attached drivers information
+ JobsState(ctx context.Context) ([]*job.State, error)
+}
+
+// Statistic interfaces end ============
+
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
type Availabler interface {
// Available method needed to collect all plugins which are available in the runtime.
Available()
}
-
-type JobsStat interface {
- Stat()
-}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index c613af58..87180be5 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -1,20 +1,30 @@
package informer
import (
+ "context"
+
endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "informer"
type Plugin struct {
+ log logger.Logger
+
+ withJobs map[string]JobsStat
withWorkers map[string]Informer
available map[string]Availabler
}
-func (p *Plugin) Init() error {
+func (p *Plugin) Init(log logger.Logger) error {
p.available = make(map[string]Availabler)
p.withWorkers = make(map[string]Informer)
+ p.withJobs = make(map[string]JobsStat)
+
+ p.log = log
return nil
}
@@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State {
return svc.Workers()
}
+// Jobs provides information about jobs for the registered plugin using jobs
+func (p *Plugin) Jobs(name string) []*job.State {
+ svc, ok := p.withJobs[name]
+ if !ok {
+ return nil
+ }
+
+ st, err := svc.JobsState(context.Background())
+ if err != nil {
+ p.log.Info("jobs stat", "error", err)
+ // skip errors here
+ return nil
+ }
+
+ return st
+}
+
// Collects declares services to be collected.
func (p *Plugin) Collects() []interface{} {
return []interface{}{
p.CollectPlugins,
p.CollectWorkers,
+ p.CollectJobs,
}
}
@@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) {
p.withWorkers[name.Name()] = r
}
+func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) {
+ p.withJobs[name.Name()] = j
+}
+
// Name of the service.
func (p *Plugin) Name() string {
return PluginName
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 02254865..478d3227 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,7 +1,8 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
type rpc struct {
@@ -10,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
- // Workers is list of workers.
+ // Workers are list of workers.
Workers []*process.State `json:"workers"`
}
@@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error {
func (rpc *rpc) Workers(service string, list *WorkerList) error {
workers := rpc.srv.Workers(service)
if workers == nil {
- list = nil
return nil
}
@@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+func (rpc *rpc) Jobs(service string, out *[]*job.State) error {
+ *out = rpc.srv.Jobs(service)
+ return nil
+}
+
// sort.Sort
func (w *WorkerList) Len() int {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 429953e1..95df02ec 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -12,10 +12,12 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -50,9 +52,8 @@ type JobConsumer struct {
multipleAck bool
requeueOnFail bool
- delayCache map[string]struct{}
-
listeners uint32
+ delayed *int64
stopCh chan struct{}
}
@@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
stopCh: make(chan struct{}),
// TODO to config
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
@@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
consumeID: uuid.NewString(),
stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
@@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- j.delayCache[tmpQ] = struct{}{}
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
@@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-j.publishChan:
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(j.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error {
})
return nil
}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(j.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 5990d137..623dcca7 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -3,6 +3,7 @@ package amqp
import (
"context"
"fmt"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -52,8 +53,8 @@ type Options struct {
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
-
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
multipleAsk bool
requeue bool
}
@@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
@@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
+ item.Options.delayed = j.delayed
// requeue func
item.Options.requeueFn = j.handleItem
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 32ca4188..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, errN
+ return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry put only when we redialed
return cp.t.Put(body, pri, delay, ttr)
@@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, nil, errN
+ return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Reserve only when we redialed
return cp.ts.Reserve(reserveTimeout)
@@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
return id, body, nil
}
-func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
+func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()
@@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return errN
+ return errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Delete only when we redialed
return cp.conn.Delete(id)
@@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
+func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ stat, err := cp.conn.Stats()
+ if err != nil {
+ errR := cp.checkAndRedial(err)
+ if errR != nil {
+ return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
+ } else {
+ return cp.conn.Stats()
+ }
+ }
+
+ return stat, nil
+}
+
func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index eaf99be1..6323148b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -213,12 +215,49 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
+}
+
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -260,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -315,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 95ad6ecd..f0992cd6 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,17 +2,18 @@ package ephemeral
import (
"context"
- "sync"
"sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -28,14 +29,18 @@ type JobConsumer struct {
cfg *Config
log logger.Logger
eh events.Handler
- pipeline sync.Map
+ pipeline atomic.Value
pq priorityqueue.Queue
localPrefetch chan *Item
// time.sleep goroutines max number
goroutines uint64
- stopCh chan struct{}
+ delayed *int64
+ active *int64
+
+ listeners uint32
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -46,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
@@ -61,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// initialize a local queue
jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -73,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
// initialize a local queue
jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -89,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ _, ok := j.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
err := j.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
@@ -106,102 +105,70 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("ephemeral_handle_request")
- // handle timeouts
- // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
-
- go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- time.Sleep(jj.Options.DelayDuration())
-
- // send the item after timeout expired
- j.localPrefetch <- jj
-
- atomic.AddUint64(&j.goroutines, ^uint64(0))
- }(msg)
-
- return nil
- }
-
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
- }
-}
-
-func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
- return
- }
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
- }
- }
+func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: pipe.Name(),
+ Active: atomic.LoadInt64(j.active),
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- const op = errors.Op("ephemeral_register")
- if _, ok := j.pipeline.Load(pipeline.Name()); ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
- }
-
- j.pipeline.Store(pipeline.Name(), true)
-
+ j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == true {
- // mark pipeline as turned off
- j.pipeline.Store(pipeline, false)
- }
- // if not true - do not send the EventPipeStopped, because pipe already stopped
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
return
}
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop the consumer
+ j.stopCh <- struct{}{}
+
j.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
- Pipeline: pipeline,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
}
-func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == false {
- // mark pipeline as turned on
- j.pipeline.Store(pipeline, true)
- }
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
- // if not true - do not send the EventPipeActive, because pipe already active
+ l := atomic.LoadUint32(&j.listeners)
+ // listener already active
+ if l == 1 {
+ j.log.Warn("listener already in the active state")
return
}
+ // resume the consumer on the same channel
+ j.consume()
+
+ atomic.StoreUint32(&j.listeners, 1)
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
- Pipeline: pipeline,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
@@ -220,25 +187,88 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
func (j *JobConsumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
- var pipe string
- j.pipeline.Range(func(key, _ interface{}) bool {
- pipe = key.(string)
- j.pipeline.Delete(key)
- return true
- })
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
select {
// return from the consumer
case j.stopCh <- struct{}{}:
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
- Pipeline: pipe,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
+
return nil
case <-ctx.Done():
return errors.E(op, ctx.Err())
}
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutines, 1)
+ atomic.AddInt64(j.delayed, 1)
+
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutines, ^uint64(0))
+ }(msg)
+
+ return nil
+ }
+
+ // increase number of the active jobs
+ atomic.AddInt64(j.active, 1)
+
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (j *JobConsumer) consume() {
+ go func() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+ item.Options.active = j.active
+ item.Options.delayed = j.delayed
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+ }()
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 1a61d7e9..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"context"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -40,6 +41,8 @@ type Options struct {
// private
requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
func (i *Item) Nack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
@@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
i.Options.Delay = delay
i.Headers = headers
+ i.atomicallyReduceCount()
+
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
@@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 9ce37543..5d419b51 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,10 +12,12 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
if err != nil {
- return err
+ return nil, errors.E(op, err)
}
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}
- return nil
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()
@@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go
new file mode 100644
index 00000000..61856a10
--- /dev/null
+++ b/plugins/jobs/metrics.go
@@ -0,0 +1,91 @@
+package jobs
+
+import (
+ "sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+)
+
+func (p *Plugin) MetricsCollector() []prometheus.Collector {
+ // p - implements Exporter interface (workers)
+ // other - request duration and count
+ return []prometheus.Collector{p.statsExporter}
+}
+
+const (
+ namespace = "rr_jobs"
+)
+
+type statsExporter struct {
+ workers informer.Informer
+ workersMemory uint64
+ jobsOk uint64
+ pushOk uint64
+ jobsErr uint64
+ pushErr uint64
+}
+
+var (
+ worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil)
+ pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil)
+ pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil)
+ jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil)
+ jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil)
+)
+
+func newStatsExporter(stats informer.Informer) *statsExporter {
+ return &statsExporter{
+ workers: stats,
+ workersMemory: 0,
+ jobsOk: 0,
+ pushOk: 0,
+ jobsErr: 0,
+ pushErr: 0,
+ }
+}
+
+func (se *statsExporter) metricsCallback(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event { //nolint:exhaustive
+ case events.EventJobOK:
+ atomic.AddUint64(&se.jobsOk, 1)
+ case events.EventPushOK:
+ atomic.AddUint64(&se.pushOk, 1)
+ case events.EventPushError:
+ atomic.AddUint64(&se.pushErr, 1)
+ case events.EventJobError:
+ atomic.AddUint64(&se.jobsErr, 1)
+ }
+ }
+}
+
+func (se *statsExporter) Describe(d chan<- *prometheus.Desc) {
+ // send description
+ d <- pushErr
+ d <- pushOk
+ d <- jobsErr
+ d <- jobsOk
+}
+
+func (se *statsExporter) Collect(ch chan<- prometheus.Metric) {
+ // get the copy of the processes
+ workers := se.workers.Workers()
+
+ // cumulative RSS memory in bytes
+ var cum uint64
+
+ // collect the memory
+ for i := 0; i < len(workers); i++ {
+ cum += workers[i].MemoryUsage
+ }
+
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum))
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk)))
+ ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr)))
+ ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk)))
+ ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr)))
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 7707cb8a..5e62c5c5 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -13,6 +13,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -57,7 +59,8 @@ type Plugin struct {
stopCh chan struct{}
// internal payloads pool
- pldPool sync.Pool
+ pldPool sync.Pool
+ statsExporter *statsExporter
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -103,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
p.log = log
+ // metrics
+ p.statsExporter = newStatsExporter(p)
+ p.events.AddListener(p.statsExporter.metricsCallback)
+
return nil
}
@@ -200,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
5. Pipeline name
*/
+ start := time.Now()
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobStart,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: 0,
+ })
+
ctx, err := jb.Context()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
@@ -218,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
resp, err := p.workersPool.Exec(exec)
p.RUnlock()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// RR protocol level error, Nack the job
errNack := jb.Nack()
if errNack != nil {
@@ -235,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.putPayload(exec)
err = jb.Ack()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.log.Error("acknowledge error, job might be missed", "error", err)
continue
}
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
}
// handle the response protocol
err = handleResponse(resp.Body, jb, p.log)
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.putPayload(exec)
errNack := jb.Nack()
if errNack != nil {
@@ -254,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// return payload
p.putPayload(exec)
}
@@ -303,6 +356,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
p.jobConstructors[name.Name()] = c
}
+func (p *Plugin) Workers() []*process.State {
+ p.RLock()
+ wrk := p.workersPool.Workers()
+ p.RUnlock()
+
+ ps := make([]*process.State, len(wrk))
+
+ for i := 0; i < len(wrk); i++ {
+ st, err := process.WorkerProcessState(wrk[i])
+ if err != nil {
+ p.log.Error("jobs workers state", "error", err)
+ return nil
+ }
+
+ ps[i] = st
+ }
+
+ return ps
+}
+
+func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
+ const op = errors.Op("jobs_plugin_drivers_state")
+ jst := make([]*jobState.State, 0, len(p.consumers))
+ for k := range p.consumers {
+ d := p.consumers[k]
+ newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
+ state, err := d.State(newCtx)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+
+ jst = append(jst, state)
+ cancel()
+ }
+ return jst, nil
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
@@ -319,7 +410,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback)
if err != nil {
return errors.E(op, err)
}
@@ -332,6 +423,7 @@ func (p *Plugin) Reset() error {
func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j.Options.Pipeline)
if !ok {
@@ -357,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error {
err := d.Push(ctx, j)
if err != nil {
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushOK,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return nil
}
@@ -370,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -392,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
err := d.Push(ctx, j[i])
if err != nil {
cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j[i].Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
@@ -536,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
case events.EventPipePaused:
p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
- p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
- p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushOK:
p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushError:
- p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobError:
- p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeActive:
p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeStopped:
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index c89877e3..e195c407 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,7 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
with string key and array of strings as a value.
For example:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 7f9859fb..94f903d5 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "context"
+
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -14,7 +16,7 @@ type rpc struct {
}
func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push")
// convert transport entity into domain
// how we can do this quickly
@@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
}
func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push_batch")
l := len(j.GetJobs())
@@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
// 2. Pipeline name
// 3. Options related to the particular pipeline
func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
pipe := &pipeline.Pipeline{}
for i := range req.GetPipeline() {
@@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
}
func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
var destroyed []string //nolint:prealloc
for i := 0; i < len(req.GetPipelines()); i++ {
@@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err
return nil
}
+func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
+ const op = errors.Op("rpc_stats")
+ state, err := r.p.JobsState(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(state); i++ {
+ resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
+ Pipeline: state[i].Pipeline,
+ Driver: state[i].Driver,
+ Queue: state[i].Queue,
+ Active: state[i].Active,
+ Delayed: state[i].Delayed,
+ Reserved: state[i].Reserved,
+ Ready: state[i].Ready,
+ })
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index c79f3eb0..fd30eb54 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -1,8 +1,10 @@
package memory
import (
+ "context"
"sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
}
return nil, nil
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
new file mode 100644
index 00000000..89d707af
--- /dev/null
+++ b/plugins/redis/jobs/config.go
@@ -0,0 +1,34 @@
+package jobs
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/consumer.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/item.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
new file mode 100644
index 00000000..5b760952
--- /dev/null
+++ b/plugins/redis/kv/config.go
@@ -0,0 +1,34 @@
+package kv
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv/kv.go
index 29f89d46..b41cb86c 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -1,4 +1,4 @@
-package redis
+package kv
import (
"context"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3c62a63f..961182a9 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -9,6 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv"
+ redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub"
)
const PluginName = "redis"
@@ -62,7 +64,7 @@ func (p *Plugin) Available() {}
// KVConstruct provides KV storage implementation over the redis plugin
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
+ st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
@@ -71,5 +73,5 @@ func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
}
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
+ return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/channel.go b/plugins/redis/pubsub/channel.go
index 0cd62d19..a1655ab2 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
@@ -92,6 +92,6 @@ func (r *redisChannel) stop() error {
return nil
}
-func (r *redisChannel) message() *pubsub.Message {
- return <-r.out
+func (r *redisChannel) message() chan *pubsub.Message {
+ return r.out
}
diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go
new file mode 100644
index 00000000..bf8d2fc9
--- /dev/null
+++ b/plugins/redis/pubsub/config.go
@@ -0,0 +1,34 @@
+package pubsub
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 01efc623..c9ad3d58 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
@@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return p.channel.message(), nil
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("redis_driver_next")
+ select {
+ case msg := <-p.channel.message():
+ return msg, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index d3271d6c..a9a5a63c 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -20,12 +20,12 @@ type Plugin struct {
log logger.Logger
watcher *Watcher
services map[string]interface{}
- res resetter.Resetter
+ res *resetter.Plugin
stopc chan struct{}
}
// Init controller service
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error {
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res *resetter.Plugin) error {
const op = errors.Op("reload_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -86,9 +86,9 @@ func (s *Plugin) Serve() chan error {
}
// make a map with unique services
- // so, if we would have a 100 events from http service
- // in map we would see only 1 key and it's config
- treshholdc := make(chan struct {
+ // so, if we would have 100 events from http service
+ // in map we would see only 1 key, and it's config
+ thCh := make(chan struct {
serviceConfig ServiceConfig
service string
}, thresholdChanBuffer)
@@ -98,7 +98,7 @@ func (s *Plugin) Serve() chan error {
go func() {
for e := range s.watcher.Event {
- treshholdc <- struct {
+ thCh <- struct {
serviceConfig ServiceConfig
service string
}{serviceConfig: s.cfg.Services[e.service], service: e.service}
@@ -111,7 +111,7 @@ func (s *Plugin) Serve() chan error {
go func() {
for {
select {
- case cfg := <-treshholdc:
+ case cfg := <-thCh:
// logic is following:
// restart
timer.Stop()
@@ -124,7 +124,7 @@ func (s *Plugin) Serve() chan error {
case <-timer.C:
if len(updated) > 0 {
for name := range updated {
- err := s.res.ResetByName(name)
+ err := s.res.Reset(name)
if err != nil {
timer.Stop()
errCh <- errors.E(op, err)
diff --git a/plugins/resetter/interface.go b/plugins/resetter/interface.go
index 47d8d791..0defcaba 100644
--- a/plugins/resetter/interface.go
+++ b/plugins/resetter/interface.go
@@ -1,17 +1,7 @@
package resetter
-// If plugin implements Resettable interface, than it state can be resetted without reload in runtime via RPC/HTTP
-type Resettable interface {
- // Reset reload all plugins
- Reset() error
-}
-
-// Resetter interface is the Resetter plugin main interface
+// Resetter interface
type Resetter interface {
- // Reset all registered plugins
- ResetAll() error
- // Reset by plugin name
- ResetByName(string) error
- // GetAll registered plugins
- GetAll() []string
+ // Reset reload plugin
+ Reset() error
}
diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go
index 4feb692a..b2fe59af 100644
--- a/plugins/resetter/plugin.go
+++ b/plugins/resetter/plugin.go
@@ -3,61 +3,32 @@ package resetter
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "resetter"
type Plugin struct {
- registry map[string]Resettable
- log logger.Logger
+ registry map[string]Resetter
}
-func (p *Plugin) ResetAll() error {
- const op = errors.Op("resetter_plugin_reset_all")
- for name := range p.registry {
- err := p.registry[name].Reset()
- if err != nil {
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-func (p *Plugin) ResetByName(plugin string) error {
- const op = errors.Op("resetter_plugin_reset_by_name")
- if plugin, ok := p.registry[plugin]; ok {
- return plugin.Reset()
- }
- return errors.E(op, errors.Errorf("can't find plugin: %s", plugin))
-}
-
-func (p *Plugin) GetAll() []string {
- all := make([]string, 0, len(p.registry))
- for name := range p.registry {
- all = append(all, name)
- }
- return all
-}
-
-func (p *Plugin) Init(log logger.Logger) error {
- p.registry = make(map[string]Resettable)
- p.log = log
+func (p *Plugin) Init() error {
+ p.registry = make(map[string]Resetter)
return nil
}
// Reset named service.
func (p *Plugin) Reset(name string) error {
+ const op = errors.Op("resetter_plugin_reset_by_name")
svc, ok := p.registry[name]
if !ok {
- return errors.E("no such service", errors.Str(name))
+ return errors.E(op, errors.Errorf("no such service: %s", name))
}
return svc.Reset()
}
// RegisterTarget resettable service.
-func (p *Plugin) RegisterTarget(name endure.Named, r Resettable) error {
+func (p *Plugin) RegisterTarget(name endure.Named, r Resetter) error {
p.registry[name.Name()] = r
return nil
}
@@ -80,5 +51,5 @@ func (p *Plugin) Available() {
// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
- return &rpc{srv: p, log: p.log}
+ return &rpc{srv: p}
}
diff --git a/plugins/resetter/rpc.go b/plugins/resetter/rpc.go
index 69c955b0..79171b5c 100644
--- a/plugins/resetter/rpc.go
+++ b/plugins/resetter/rpc.go
@@ -1,30 +1,29 @@
package resetter
-import "github.com/spiral/roadrunner/v2/plugins/logger"
+import "github.com/spiral/errors"
type rpc struct {
srv *Plugin
- log logger.Logger
}
// List all resettable plugins.
func (rpc *rpc) List(_ bool, list *[]string) error {
- rpc.log.Debug("started List method")
*list = make([]string, 0)
for name := range rpc.srv.registry {
*list = append(*list, name)
}
- rpc.log.Debug("services list", "services", *list)
-
- rpc.log.Debug("finished List method")
return nil
}
// Reset named plugin.
func (rpc *rpc) Reset(service string, done *bool) error {
- rpc.log.Debug("started Reset method for the service", "service", service)
- defer rpc.log.Debug("finished Reset method for the service", "service", service)
+ const op = errors.Op("resetter_rpc_reset")
+ err := rpc.srv.Reset(service)
+ if err != nil {
+ *done = false
+ return errors.E(op, err)
+ }
*done = true
- return rpc.srv.Reset(service)
+ return nil
}
diff --git a/plugins/server/command.go b/plugins/server/command.go
index e0b61896..b8bc1395 100644
--- a/plugins/server/command.go
+++ b/plugins/server/command.go
@@ -29,5 +29,5 @@ func (server *Plugin) scanCommand(cmd []string) error {
return nil
}
}
- return errors.E(errors.Str("scan failed, possible path not found"), op)
+ return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 1694cdf1..16e3bd8c 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -97,7 +97,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// try to find a path here
err := server.scanCommand(cmdArgs)
if err != nil {
- server.log.Info("scan command", "error", err)
+ server.log.Info("scan command", "reason", err)
}
return func() *exec.Cmd {
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 28b84443..3bd0f956 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,7 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 2df23f11..395b056f 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -13,7 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -58,6 +58,10 @@ type Plugin struct {
// server which produces commands to the pool
server server.Server
+ // stop receiving messages
+ cancel context.CancelFunc
+ ctx context.Context
+
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
@@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.log = log
p.broadcaster = b
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.ctx = ctx
+ p.cancel = cancel
return nil
}
@@ -118,7 +126,8 @@ func (p *Plugin) Serve() chan error {
Supervisor: p.cfg.Pool.Supervisor,
}, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
+ return
}
p.accessValidator = p.defaultAccessValidator(p.phpPool)
@@ -129,17 +138,17 @@ func (p *Plugin) Serve() chan error {
// we need here only Reader part of the interface
go func(ps pubsub.Reader) {
for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
+ data, err := ps.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
- p.workersPool.Queue(data)
+
+ errCh <- errors.E(op, err)
+ return
}
+
+ p.workersPool.Queue(data)
}
}(p.subReader)
@@ -149,6 +158,8 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
+ // cancel context
+ p.cancel()
p.Lock()
if p.phpPool == nil {
p.Unlock()