summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /plugins
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/config.go)2
-rw-r--r--plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/consumer.go)166
-rw-r--r--plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/item.go)20
-rw-r--r--plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/listener.go)12
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/rabbit_init.go)18
-rw-r--r--plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/redial.go)74
-rw-r--r--plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go)5
-rw-r--r--plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go)0
-rw-r--r--plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go)0
-rw-r--r--plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go)26
-rw-r--r--plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go)0
-rw-r--r--plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go)2
-rw-r--r--plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go)2
-rw-r--r--plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go)0
-rw-r--r--plugins/boltdb/boltjobs/config.go39
-rw-r--r--plugins/boltdb/boltjobs/consumer.go422
-rw-r--r--plugins/boltdb/boltjobs/item.go229
-rw-r--r--plugins/boltdb/boltjobs/listener.go151
-rw-r--r--plugins/boltdb/boltkv/config.go (renamed from plugins/kv/drivers/boltdb/config.go)2
-rw-r--r--plugins/boltdb/boltkv/driver.go (renamed from plugins/kv/drivers/boltdb/driver.go)15
-rw-r--r--plugins/boltdb/doc/boltjobs.drawio1
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md10
-rw-r--r--plugins/boltdb/plugin.go82
-rw-r--r--plugins/broadcast/plugin.go63
-rw-r--r--plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go)129
-rw-r--r--plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go)0
-rw-r--r--plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go)0
-rw-r--r--plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go)33
-rw-r--r--plugins/jobs/job/job_options.go32
-rw-r--r--plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go)0
-rw-r--r--plugins/jobs/plugin.go18
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go71
-rw-r--r--plugins/kv/plugin.go118
-rw-r--r--plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go)0
-rw-r--r--plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go)0
-rw-r--r--plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go)0
-rw-r--r--plugins/resetter/plugin.go2
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go)0
-rw-r--r--plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go)94
-rw-r--r--plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go)11
-rw-r--r--plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go)36
-rw-r--r--plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go)0
-rw-r--r--plugins/status/plugin.go4
44 files changed, 1337 insertions, 554 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/amqp/amqpjobs/config.go
index 1ec089f1..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/amqp/amqpjobs/config.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
// pipeline rabbitmq info
const (
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 95df02ec..784a102c 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -20,7 +20,11 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+const (
+ pluginName string = "amqp"
+)
+
+type consumer struct {
sync.Mutex
log logger.Logger
pq priorityqueue.Queue
@@ -58,7 +62,7 @@ type JobConsumer struct {
}
// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
globalCfg.InitDefault()
// PARSE CONFIGURATION END -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: e,
@@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer_from_pipeline")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// PARSE CONFIGURATION -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
eh: e,
pq: pq,
@@ -214,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != job.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- err := j.handleItem(ctx, fromJob(job))
+ err := c.handleItem(ctx, fromJob(job))
if err != nil {
return errors.E(op, err)
}
@@ -232,38 +236,38 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_run")
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -275,9 +279,11 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -287,28 +293,28 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
- q, err := pch.QueueInspect(j.queue)
+ q, err := pch.QueueInspect(c.queue)
if err != nil {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: q.Name,
Active: int64(q.Messages),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
case <-ctx.Done():
@@ -316,37 +322,37 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- err := j.consumeChan.Cancel(j.consumeID, true)
+ err := c.consumeChan.Cancel(c.consumeID, true)
if err != nil {
- j.log.Error("cancel publish channel, forcing close", "error", err)
- errCl := j.consumeChan.Close()
+ c.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := c.consumeChan.Close()
if errCl != nil {
- j.log.Error("force close failed", "error", err)
+ c.log.Error("force close failed", "error", err)
return
}
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -354,40 +360,40 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("amqp listener already in the active state")
+ c.log.Warn("amqp listener already in the active state")
return
}
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
- j.log.Error("create channel on rabbitmq connection", "error", err)
+ c.log.Error("create channel on rabbitmq connection", "error", err)
return
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
- j.log.Error("qos set failed", "error", err)
+ c.log.Error("qos set failed", "error", err)
return
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -395,17 +401,17 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
nil,
)
if err != nil {
- j.log.Error("consume operation failed", "error", err)
+ c.log.Error("consume operation failed", "error", err)
return
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
// increase number of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -413,11 +419,13 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Stop(context.Context) error {
- j.stopCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -427,13 +435,13 @@ func (j *JobConsumer) Stop(context.Context) error {
}
// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
// return the channel back
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
// convert
@@ -445,39 +453,39 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
// handle timeouts
if msg.Options.DelayDuration() > 0 {
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddInt64(c.delayed, 1)
// TODO declare separate method for this if condition
// TODO dlx cache channel??
delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue)
_, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
+ dlx: c.exchangeName,
+ dlxRoutingKey: c.routingKey,
dlxTTL: delayMs,
dlxExpires: delayMs * 2,
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil)
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
- Timestamp: time.Now().UTC(),
+ Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
@@ -485,7 +493,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now(),
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/amqp/amqpjobs/item.go
index 623dcca7..66b70a36 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- item, err := j.unpack(d)
+ item, err := c.unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
@@ -156,10 +156,10 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- item.Options.delayed = j.delayed
+ item.Options.delayed = c.delayed
// requeue func
- item.Options.requeueFn = j.handleItem
+ item.Options.requeueFn = c.handleItem
return i, nil
}
@@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
+func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
- multipleAsk: j.multipleAck,
- requeue: j.requeueOnFail,
- requeueFn: j.handleItem,
+ multipleAsk: c.multipleAck,
+ requeue: c.requeueOnFail,
+ requeueFn: c.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
@@ -230,7 +230,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
if _, ok := d.Headers[job.RRPriority]; !ok {
// set pipe's priority
- item.Options.Priority = j.priority
+ item.Options.Priority = c.priority
} else {
item.Options.Priority = d.Headers[job.RRPriority].(int64)
}
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/amqp/amqpjobs/listener.go
index 0b1cd2dc..75c61cad 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/amqp/amqpjobs/listener.go
@@ -1,24 +1,24 @@
-package amqp
+package amqpjobs
import amqp "github.com/rabbitmq/amqp091-go"
-func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
+func (c *consumer) listener(deliv <-chan amqp.Delivery) {
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-deliv:
if !ok {
- j.log.Info("delivery channel closed, leaving the rabbit listener")
+ c.log.Info("delivery channel closed, leaving the rabbit listener")
return
}
- d, err := j.fromDelivery(msg)
+ d, err := c.fromDelivery(msg)
if err != nil {
- j.log.Error("amqp delivery convert", "error", err)
+ c.log.Error("amqp delivery convert", "error", err)
continue
}
// insert job into the main priority queue
- j.pq.Insert(d)
+ c.pq.Insert(d)
}
}
}()
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
index 56ef10c8..fb5f6911 100644
--- a/plugins/jobs/drivers/amqp/rabbit_init.go
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
@@ -1,23 +1,23 @@
-package amqp
+package amqpjobs
import (
"github.com/spiral/errors"
)
-func (j *JobConsumer) initRabbitMQ() error {
+func (c *consumer) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
- channel, err := j.conn.Channel()
+ channel, err := c.conn.Channel()
if err != nil {
return errors.E(op, err)
}
// declare an exchange (idempotent operation)
err = channel.ExchangeDeclare(
- j.exchangeName,
- j.exchangeType,
+ c.exchangeName,
+ c.exchangeType,
true,
false,
false,
@@ -30,10 +30,10 @@ func (j *JobConsumer) initRabbitMQ() error {
// verify or declare a queue
q, err := channel.QueueDeclare(
- j.queue,
+ c.queue,
false,
false,
- j.exclusive,
+ c.exclusive,
false,
nil,
)
@@ -44,8 +44,8 @@ func (j *JobConsumer) initRabbitMQ() error {
// bind queue to the exchange
err = channel.QueueBind(
q.Name,
- j.routingKey,
- j.exchangeName,
+ c.routingKey,
+ c.exchangeName,
false,
nil,
)
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/amqp/amqpjobs/redial.go
index 8dc18b8f..8d21784f 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"time"
@@ -11,70 +11,70 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
+func (c *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
for {
select {
- case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ case err := <-c.conn.NotifyClose(make(chan *amqp.Error)):
if err == nil {
return
}
- j.Lock()
+ c.Lock()
// trash the broken publishing channel
- <-j.publishChan
+ <-c.publishChan
- t := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ t := time.Now().UTC()
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeError,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Error: err,
- Start: time.Now(),
+ Start: time.Now().UTC(),
})
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
- expb.MaxElapsedTime = j.retryTimeout
+ expb.MaxElapsedTime = c.retryTimeout
operation := func() error {
- j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ c.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
- j.conn, dialErr = amqp.Dial(j.connStr)
+ c.conn, dialErr = amqp.Dial(c.connStr)
if dialErr != nil {
return errors.E(op, dialErr)
}
- j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+ c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
- errInit := j.initRabbitMQ()
+ errInit := c.initRabbitMQ()
if errInit != nil {
- j.log.Error("rabbitmq dial", "error", errInit)
+ c.log.Error("rabbitmq dial", "error", errInit)
return errInit
}
// redeclare consume channel
var errConnCh error
- j.consumeChan, errConnCh = j.conn.Channel()
+ c.consumeChan, errConnCh = c.conn.Channel()
if errConnCh != nil {
return errors.E(op, errConnCh)
}
// redeclare publish channel
- pch, errPubCh := j.conn.Channel()
+ pch, errPubCh := c.conn.Channel()
if errPubCh != nil {
return errors.E(op, errPubCh)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -86,23 +86,23 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
}
// put the fresh publishing channel
- j.publishChan <- pch
+ c.publishChan <- pch
// restart listener
- j.listener(deliv)
+ c.listener(deliv)
- j.log.Info("queues and subscribers redeclared successfully")
+ c.log.Info("queues and subscribers redeclared successfully")
return nil
}
retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
- j.Unlock()
- j.log.Error("backoff failed", "error", retryErr)
+ c.Unlock()
+ c.log.Error("backoff failed", "error", retryErr)
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
@@ -110,27 +110,27 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
Elapsed: time.Since(t),
})
- j.Unlock()
+ c.Unlock()
- case <-j.stopCh:
- if j.publishChan != nil {
- pch := <-j.publishChan
+ case <-c.stopCh:
+ if c.publishChan != nil {
+ pch := <-c.publishChan
err := pch.Close()
if err != nil {
- j.log.Error("publish channel close", "error", err)
+ c.log.Error("publish channel close", "error", err)
}
}
- if j.consumeChan != nil {
- err := j.consumeChan.Close()
+ if c.consumeChan != nil {
+ err := c.consumeChan.Close()
if err != nil {
- j.log.Error("consume channel close", "error", err)
+ c.log.Error("consume channel close", "error", err)
}
}
- if j.conn != nil {
- err := j.conn.Close()
+ if c.conn != nil {
+ err := c.conn.Close()
if err != nil {
- j.log.Error("amqp connection close", "error", err)
+ c.log.Error("amqp connection close", "error", err)
}
}
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go
index 624f4405..c4f5f1da 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/amqp/plugin.go
@@ -4,6 +4,7 @@ import (
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -31,10 +32,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+ return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
}
// FromPipeline constructs AMQP driver from pipeline
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
+ return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq)
}
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go
index a8069f5d..a8069f5d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/beanstalk/config.go
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go
index d3241b37..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/beanstalk/connection.go
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 6323148b..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -19,7 +19,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+type consumer struct {
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -43,7 +43,7 @@ type JobConsumer struct {
requeueCh chan *Item
}
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return jc, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered
@@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+func (j *consumer) handleItem(ctx context.Context, item *Item) error {
const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
@@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("beanstalk_state")
stat, err := j.pool.Stats(ctx)
if err != nil {
@@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go
index e43207eb..e43207eb 100644
--- a/plugins/jobs/drivers/beanstalk/encode_test.go
+++ b/plugins/beanstalk/encode_test.go
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go
index f1d7ac76..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
@@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
+func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go
index f1385e70..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/beanstalk/listen.go
@@ -4,7 +4,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen() {
+func (j *consumer) listen() {
for {
select {
case <-j.stopCh:
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go
index 529d1474..529d1474 100644
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ b/plugins/beanstalk/plugin.go
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go
new file mode 100644
index 00000000..8cc098c1
--- /dev/null
+++ b/plugins/boltdb/boltjobs/config.go
@@ -0,0 +1,39 @@
+package boltjobs
+
+const (
+ file string = "file"
+ priority string = "priority"
+ prefetch string = "prefetch"
+)
+
+type GlobalCfg struct {
+ // db file permissions
+ Permissions int `mapstructure:"permissions"`
+ // consume timeout
+}
+
+func (c *GlobalCfg) InitDefaults() {
+ if c.Permissions == 0 {
+ c.Permissions = 0777
+ }
+}
+
+type Config struct {
+ File string `mapstructure:"file"`
+ Priority int `mapstructure:"priority"`
+ Prefetch int `mapstructure:"prefetch"`
+}
+
+func (c *Config) InitDefaults() {
+ if c.File == "" {
+ c.File = "rr.db"
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+
+ if c.Prefetch == 0 {
+ c.Prefetch = 1000
+ }
+}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
new file mode 100644
index 00000000..ed0eda61
--- /dev/null
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -0,0 +1,422 @@
+package boltjobs
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+const (
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
+
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
+)
+
+type consumer struct {
+ file string
+ permissions int
+ priority int
+ prefetch int
+
+ db *bolt.DB
+
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
+ listeners uint32
+ active *uint64
+ delayed *uint64
+
+ stopCh chan struct{}
+}
+
+func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg := &Config{}
+ err = cfg.UnmarshalKey(configKey, localCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg.InitDefaults()
+ conf.InitDefaults()
+
+ db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
+ permissions: conf.Permissions,
+ file: localCfg.File,
+ priority: localCfg.Priority,
+ prefetch: localCfg.Prefetch,
+
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 2),
+ }, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
+ file: pipeline.String(file, rrDB),
+ priority: pipeline.Int(priority, 10),
+ prefetch: pipeline.Int(prefetch, 100),
+ permissions: conf.Permissions,
+
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 2),
+ }, nil
+}
+
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
+ const op = errors.Op("boltdb_jobs_push")
+ err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+ // pool with buffers
+ buf := c.get()
+ // encode the job
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ c.put(buf)
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+ c.put(buf)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ err = b.Put(utils.AsBytes(tKey), value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return nil
+ }
+
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ err = b.Put(utils.AsBytes(item.ID()), value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return nil
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
+ return nil
+}
+
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("boltdb_run")
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 1 {
+ c.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
new file mode 100644
index 00000000..837f8c63
--- /dev/null
+++ b/plugins/boltdb/boltjobs/item.go
@@ -0,0 +1,229 @@
+package boltjobs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+ "go.etcd.io/bbolt"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ db *bbolt.DB
+ active *uint64
+ delayed *uint64
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ const op = errors.Op("boltdb_item_ack")
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ if i.Options.Delay > 0 {
+ atomic.AddUint64(i.Options.delayed, ^uint64(0))
+ } else {
+ atomic.AddUint64(i.Options.active, ^uint64(0))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) Nack() error {
+ const op = errors.Op("boltdb_item_ack")
+ /*
+ steps:
+ 1. begin tx
+ 2. get item by ID from the InQueueBucket (previously put in the listener)
+ 3. put it back to the PushBucket
+ 4. Delete it from the InQueueBucket
+ */
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ v := inQb.Get(utils.AsBytes(i.ID()))
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ err = pushB.Put(utils.AsBytes(i.ID()), v)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ return tx.Commit()
+}
+
+/*
+Requeue algorithm:
+1. Rewrite item headers and delay.
+2. Begin writable transaction on attached to the item db.
+3. Delete item from the InQueueBucket
+4. Handle items with the delay:
+ 4.1. Get DelayBucket
+ 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format
+ 4.3. Put this key with value to the DelayBucket
+5. W/o delay, put the key with value to the PushBucket (requeue)
+*/
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ const op = errors.Op("boltdb_item_requeue")
+ i.Headers = headers
+ i.Options.Delay = delay
+
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ // encode the item
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ val := make([]byte, buf.Len())
+ copy(val, buf.Bytes())
+ buf.Reset()
+
+ if delay > 0 {
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
+
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = delayB.Put(utils.AsBytes(tKey), val)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+ }
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = pushB.Put(utils.AsBytes(i.ID()), val)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
+ i.Options.db = db
+ i.Options.active = active
+ i.Options.delayed = delayed
+}
+
+func (i *Item) rollback(err error, tx *bbolt.Tx) error {
+ errR := tx.Rollback()
+ if errR != nil {
+ return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR)
+ }
+ return errors.Errorf("transaction commit error: %v", err)
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
new file mode 100644
index 00000000..7c161555
--- /dev/null
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -0,0 +1,151 @@
+package boltjobs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+func (c *consumer) listener() {
+ tt := time.NewTicker(time.Millisecond)
+ defer tt.Stop()
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Info("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := b.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = b.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+ }
+}
+
+func (c *consumer) delayedJobsListener() {
+ tt := time.NewTicker(time.Second)
+ defer tt.Stop()
+
+ // just some 90's
+ loc, err := time.LoadLocation("UTC")
+ if err != nil {
+ c.log.Error("failed to load location, delayed jobs won't work", "error", err)
+ return
+ }
+
+ var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339))
+
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Info("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ cursor := delayB.Cursor()
+ endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339))
+
+ for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() {
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+ }
+ }
+}
+
+func (c *consumer) rollback(err error, tx *bolt.Tx) {
+ errR := tx.Rollback()
+ if errR != nil {
+ c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR)
+ }
+
+ c.log.Error("transaction commit error, rollback succeed", "error", err)
+}
diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/boltdb/boltkv/config.go
index 0beb209b..56d00674 100644
--- a/plugins/kv/drivers/boltdb/config.go
+++ b/plugins/boltdb/boltkv/config.go
@@ -1,4 +1,4 @@
-package boltdb
+package boltkv
type Config struct {
// File is boltDB file. No need to create it by your own,
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/boltdb/boltkv/driver.go
index 15a5674f..ba1450cd 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -1,4 +1,4 @@
-package boltdb
+package boltkv
import (
"bytes"
@@ -16,6 +16,10 @@ import (
bolt "go.etcd.io/bbolt"
)
+const (
+ RootPluginName string = "kv"
+)
+
type Driver struct {
clearMu sync.RWMutex
// db instance
@@ -24,7 +28,8 @@ type Driver struct {
bucket []byte
log logger.Logger
cfg *Config
- // gc contains key which are contain timeouts
+
+ // gc contains keys with timeouts
gc sync.Map
// default timeout for cache cleanup is 1 minute
timeout time.Duration
@@ -36,6 +41,10 @@ type Driver struct {
func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
+ if !cfgPlugin.Has(RootPluginName) {
+ return nil, errors.E(op, errors.Str("no kv section in the configuration"))
+ }
+
d := &Driver{
log: log,
stop: stop,
@@ -157,7 +166,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
}
// set the value
- val = []byte(i)
+ val = utils.AsBytes(i)
}
return nil
})
diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio
new file mode 100644
index 00000000..7d1f3531
--- /dev/null
+++ b/plugins/boltdb/doc/boltjobs.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-08-31T09:34:11.357Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="KiNZAPNeIcd5kV3EE5lF" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">7V1bc5s4GP01nmkfkgHE9TF2km13up20me1uH2WQbRJsuYATe3/9ijtICiHmIrvBnWmNEAjrfNejT3QCZuv9Hz7crv7CDvImiuTsJ+B6oigKABr5J2o5JC0yUJWkZem7TtpWNNy7/6G0UUpbd66DgkrHEGMvdLfVRhtvNsgOK23Q9/FztdsCe9VRt3CJmIZ7G3ps6z+uE67SVl2SihOfkLtcZUMr2Zk1zHqnDcEKOvi51ARuJmDmYxwm39b7GfKi6csmJrnu9oWz+ZP5aBM2ucDcf0ZP8pX6w3iw/K8/fijw1/VFikYQHrJfjBwyAekh9sMVXuIN9G6K1qmPdxsHRXeVyFHR5wvGW9Iok8YHFIaHFE24CzFpWoVrLz1LHtg//Btdf6llhz/T28UH1/vK0SE9WuBNeAvXrhc1zGBgQ8eF5OlnRO7S0+mYMiDHyU+Lfs+LM5aJFPSXKKyZJjnHi4g6wmtEHotc5yMPhu5T9f4wFbll3i+/9A67ZGRFSvVDV1PROORiVb1F8lzpVWVoX7uRSt0owDvfRsyNyJfS7ymaYsl5gxRl2jqwGO3dsCRF5Ohn6UwhQ9HBoSxQpyR6CTKviV4DEVVbimhFJt4sAMl9n6C3S0e6+/v+E2n5fjdjRKMK/PPKDdH9FsaT8EzcSBXkN078wvW8GfawHw8F5tA2nag9CH38iEpnFKCqmpND9YT8EO3rwWInN71AM6vqp2R6/FxyGZkbWJW8Ba2mneFhiNRHuaSNhW7y9VGcXoGGeqWJ1CvzXHAs7GrFqhZG9uTsalP8hdpVa8RfNP66SPwB41enJPm5nrb0qVUnuSAf0+Q5SaADCzjHwtTKp+pVl6qZGutSZY5LBX25VFlmJv1EdVGcTrW1lfzsxqDCK7VhmtRVdqOywe0uWJGW6c5+RGG3uqhFf3i6qMefFL+yjsYfITqqaBQwoGHcSwPYmZIqQh1mNRGVapX0pUQ050dqHGZNwmM7aG7OubacfG5vX5eTDs2Bfg4hlgxOR2Qachfy5ExiLFk5hyBL1hgDP8PbaPwHPOcKxxc4R14VUOi5yw35bpNJQkTlppFldW3oXaUn1q7jJLKDAvc/OI/vF8GzjfxW/Ju06US75ul4E43lWPKUFE/HmuRMdBnKGp140e4TkbOUdNJa+vYLWbIqPuRCoXwDXiwC1A9rqYvVfKO56tMS0RONKUL1ZaOXoJEkB1QGMXDUmE0AGzZ6bhCiDTESivRBltbBx24jyAXSbZsXATiGNZeOdgitIkUmhOdFiuqQkaLMRvXyZTwHheWXiO6Sv1aI/P15822HdigL+XWPPOB07ldw03/tosW8eAIvgngGr0gHGWz38TRm58m3ZfSvEg14jTwURiMsfLzOh4tE5YX0gqAQ8n2PhxZhO88TmasghZ4RoQ3eIDHSQ5EBwGSlJ5eUQfh1WSgx+5ag8Qw9h9HUc1jDeA59aM9hsJ7j010k7th/RH4QzSvGXkuvQckFE79SbgXJjoYMnluxdANAvamItDIEskQtdGuG6JU2RSgtKF0qw6x9vzkd6cYUWA1NQamgRkACmeVCp+8NfmcReCFnHWihRjCL9IZcsisZYByBFH9OVDrEcowW49G/o2CLNwEa1os7EJkLbnKo2yaaL8R4cV28FxdaMCOVdLf5QnvLdYPBdDQD8rRJ4OwpSyr6FdqPw6rnwrQRn7uZm5qqNbatrdTTtKraaUrCtZOl0m58n0xNc2qkC1q+KTnSBXXfHC4rq4xI4TI41JrGQYvm2bsLhQRnREdFQsq5GNOhypbiS698Hx5KHVKdaE6dWDq1J4G6QDFr+5MvySN0yq8obFXV1dCW3tGQ6ag8S28qc6ALolPEm3ogOJU+jk4xWlmPU0ikFLWhXWmdZh9nWHJ3lUmqVW9YZBnUXtCPZQGCV5TbkwC/t+y2ZQmPlF2qTM6iN+pRF1j1/XsSXbEZ8HslsBrLLhBKYCmcGlgfh9h+n8tQ8smlyOrZVKe32SlyatqrNVReVejyFFveeE3GjcbvoYC9PcM8QKF642Xj3sqPsui0Uh0WxQDzmHQsVx4dVQr0+/JdJDyqYsmxvQpvZ1B/hBdLT+ZFXX/GdWSV0q5KJdll3seN6jocMquXI8xlFc2CYJWFGRiDwnxCe0veT4AMGgfIpkgfq57hAmI1/jJaB2C3t5Jknqh0iKHDKXrVMl8hraj+hqZRgtlH4s+mdt/Rr8hDtYwL37r2CR0D8tc+VV2T1GEyO5pntIS/yiN7YdY5WZauCXERlsVsaFmEvh8ke0o2txtVV5Y04bqritTd41gZSndbx4wCdDfDs/eNx0dFBQZd/SbVRwV0f1Wu7w/q+/cTRagsn5GRTD1vd2tdMtUD36Q14ZsG3e6mM/B83pDjb0mcJ817IAJbVzgMQAQackMX0d8bK7gePCGPHl4hjxi83iVRRJUQ8dZilEGJIpUl3JOtpSsUc7tOahh9uFlGmH6A8T0J2kH82OEqsecJ4nGvj+Udp6XNowlnjPZb1ydqmt+jLXs83EbSYWVFpdbtdB6paA0aIdao/6j4tWAa1EKAxdsVzHOy/Sk+u80jUdDEkEeaHan7V/w8iZ5hmtkCot3SIzpUFTcOn0a4ixfCqVW4uekdV3l7w1tjg152Gf7ssKLeLxblbzHn34WvpsqDAcf+8qKv/hBkX1zM0m0jgiUEqWU5VTiCCoMguxVzRLBA0KJiII2TAg2LIFuOz9JmI4IFgnL+4tMsjOVs+RkWQnYNia3jHyEsQUjvvODt2hoWQjZpHSGsh5AiHgzOUuGwELIk3whhvSEFp6aF7Kt/2C0lI4SleJR6NRiX/RsUQZbQYUuDRgQLBOktoDonrR8WQZbFGXWwVgfp/Uqic8LMJowINq6doBdRLNEQssTMaEbrIKTfsMplwwdFkCVm2JdmjggWCOpKA4K7IwjJYfEfTyY1IMV/4Alu/gc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
new file mode 100644
index 00000000..317aec90
--- /dev/null
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -0,0 +1,10 @@
+### Job lifecycle
+
+There are several boltdb buckets:
+
+1. `PushBucket` - used for pushed jobs via RPC.
+2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and
+get into the `InQueueBucket` waiting to acknowledgement.
+3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
+
+``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
new file mode 100644
index 00000000..683b26f1
--- /dev/null
+++ b/plugins/boltdb/plugin.go
@@ -0,0 +1,82 @@
+package boltdb
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "boltdb"
+)
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+
+ drivers uint
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.stop = make(chan struct{})
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (p *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (p *Plugin) Stop() error {
+ if p.drivers > 0 {
+ for i := uint(0); i < p.drivers; i++ {
+ // send close signal to every driver
+ p.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+// Name returns plugin name
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Available interface implementation
+func (p *Plugin) Available() {}
+
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ p.drivers++
+
+ return st, nil
+}
+
+// JOBS bbolt implementation
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 889dc2fa..a2390df5 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
- "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
@@ -16,9 +15,6 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- redis string = "redis"
- memory string = "memory"
)
type Plugin struct {
@@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error {
}
func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ // TODO(rustatian) channel here?
go func() {
p.Lock()
defer p.Unlock()
@@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
err := p.publishers[j].Publish(m)
if err != nil {
p.log.Error("publishAsync", "error", err)
- // continue publish to other registered publishers
+ // continue publishing to the other registered publishers
continue
}
}
@@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
}()
}
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// choose a driver
@@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, key)
- switch val.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[uuid.NewString()] = ps
+ drName := val.(map[string]interface{})[driver]
- return ps, nil
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ // try local config first
+ if p.cfgPlugin.Has(configKey) {
+ ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
- return ps, nil
- // then try global if local does not exist
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ return ps, nil
+ } else {
+ // try global driver section
+ ps, err := p.constructors[drStr].PSConstruct(drStr)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
+
return ps, nil
}
}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index f0992cd6..8870bb0f 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
@@ -25,7 +25,7 @@ type Config struct {
Prefetch uint64 `mapstructure:"prefetch"`
}
-type JobConsumer struct {
+type consumer struct {
cfg *Config
log logger.Logger
eh events.Handler
@@ -43,10 +43,10 @@ type JobConsumer struct {
stopCh chan struct{}
}
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_ephemeral_pipeline")
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- jb := &JobConsumer{
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- _, ok := j.pipeline.Load().(*pipeline.Pipeline)
+ _, ok := c.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
@@ -105,42 +105,42 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: pipe.Name(),
- Active: atomic.LoadInt64(j.active),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Active: atomic.LoadInt64(c.active),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
}
-func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop the consumer
- j.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -149,24 +149,24 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// listener already active
if l == 1 {
- j.log.Warn("listener already in the active state")
+ c.log.Warn("listener already in the active state")
return
}
// resume the consumer on the same channel
- j.consume()
+ c.consume()
- atomic.StoreUint32(&j.listeners, 1)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -175,8 +175,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- j.eh.Push(events.JobEvent{
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -185,84 +185,79 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(ctx context.Context) error {
- const op = errors.Op("ephemeral_plugin_stop")
+func (c *consumer) Stop(_ context.Context) error {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- select {
- // return from the consumer
- case j.stopCh <- struct{}{}:
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
- return nil
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
- case <-ctx.Done():
- return errors.E(op, ctx.Err())
- }
+ return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
// goroutines here. We should limit goroutines here.
if msg.Options.Delay > 0 {
// if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ if atomic.LoadUint64(&c.goroutines) >= goroutinesMax {
return errors.E(op, errors.Str("max concurrency number reached"))
}
go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddUint64(&c.goroutines, 1)
+ atomic.AddInt64(c.delayed, 1)
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localPrefetch <- jj
+ c.localPrefetch <- jj
- atomic.AddUint64(&j.goroutines, ^uint64(0))
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
}(msg)
return nil
}
// increase number of the active jobs
- atomic.AddInt64(j.active, 1)
+ atomic.AddInt64(c.active, 1)
// insert to the local, limited pipeline
select {
- case j.localPrefetch <- msg:
+ case c.localPrefetch <- msg:
return nil
case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err()))
}
}
-func (j *JobConsumer) consume() {
+func (c *consumer) consume() {
go func() {
// redirect
for {
select {
- case item, ok := <-j.localPrefetch:
+ case item, ok := <-c.localPrefetch:
if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue was closed")
return
}
// set requeue channel
- item.Options.requeueFn = j.handleItem
- item.Options.active = j.active
- item.Options.delayed = j.delayed
+ item.Options.requeueFn = c.handleItem
+ item.Options.active = c.active
+ item.Options.delayed = c.delayed
- j.pq.Insert(item)
- case <-j.stopCh:
+ c.pq.Insert(item)
+ case <-c.stopCh:
return
}
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go
index 3298424d..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/ephemeral/item.go
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
index 28495abb..28495abb 100644
--- a/plugins/jobs/drivers/ephemeral/plugin.go
+++ b/plugins/ephemeral/plugin.go
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go
index 390f44b5..06c3254e 100644
--- a/plugins/jobs/job/general.go
+++ b/plugins/jobs/job/job.go
@@ -1,5 +1,9 @@
package job
+import (
+ "time"
+)
+
// constant keys to pack/unpack messages from different drivers
const (
RRID string = "rr_id"
@@ -27,3 +31,32 @@ type Job struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
deleted file mode 100644
index b7e4ed36..00000000
--- a/plugins/jobs/job/job_options.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package job
-
-import "time"
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-}
-
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go
index a47151a3..a47151a3 100644
--- a/plugins/jobs/job/job_options_test.go
+++ b/plugins/jobs/job/job_test.go
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5e62c5c5..3f3fa196 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -177,8 +177,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return true
})
+ // do not continue processing, immediately stop if channel contains an error
+ if len(errCh) > 0 {
+ return errCh
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
errCh <- err
return errCh
@@ -219,6 +224,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventJobError,
+ Error: err,
ID: jb.ID(),
Start: start,
Elapsed: time.Since(start),
@@ -243,6 +249,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -266,6 +273,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -279,6 +287,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
+ continue
}
// handle the response protocol
@@ -288,6 +298,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Event: events.EventJobError,
ID: jb.ID(),
Start: start,
+ Error: err,
Elapsed: time.Since(start),
})
p.putPayload(exec)
@@ -307,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
// return payload
p.putPayload(exec)
}
@@ -343,6 +355,10 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
return nil
}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
deleted file mode 100644
index c839130f..00000000
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package boltdb
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/kv"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "boltdb"
- RootPluginName string = "kv"
-)
-
-// Plugin BoltDB K/V storage.
-type Plugin struct {
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(RootPluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.stop = make(chan struct{})
- s.log = log
- s.cfgPlugin = cfg
- return nil
-}
-
-// Serve is noop here
-func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
-}
-
-func (s *Plugin) Stop() error {
- if s.drivers > 0 {
- for i := uint(0); i < s.drivers; i++ {
- // send close signal to every driver
- s.stop <- struct{}{}
- }
- }
- return nil
-}
-
-func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_provide")
- st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save driver number to release resources after Stop
- s.drivers++
-
- return st, nil
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (s *Plugin) Available() {}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 53fade97..c6ca96c3 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -16,11 +16,6 @@ const PluginName string = "kv"
const (
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- memcached string = "memcached"
- boltdb string = "boltdb"
- redis string = "redis"
- memory string = "memory"
)
// Plugin for the unified storage
@@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error { //nolint:gocognit
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
// value - storage
- /*
- For example we can have here 2 storages (but they are not pre-configured)
- for the boltdb and memcached
- We should provide here the actual configs for the all requested storages
- kv:
- boltdb-south:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- boltdb-north:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- memcached:
- driver: memcached
- addr: [ "127.0.0.1:11211" ]
-
-
- For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
- when user requests for example boltdb-south, we should provide that particular preconfigured storage
- */
+ // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ // when user requests for example boltdb-south, we should provide that particular preconfigured storage
+
for k, v := range p.cfg.Data {
// for example if the key not properly formatted (yaml)
if v == nil {
@@ -109,43 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
- switch v.(map[string]interface{})[driver] {
- case memcached:
- if _, ok := p.constructors[memcached]; !ok {
- p.log.Warn("no memcached constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memcached].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
-
- case boltdb:
- if _, ok := p.constructors[boltdb]; !ok {
- p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[boltdb].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
+ drName := v.(map[string]interface{})[driver]
- // save the storage
- p.storages[k] = storage
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
continue
}
- storage, err := p.constructors[memory].KVConstruct(configKey)
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -153,42 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- p.log.Warn("no redis constructors registered", "registered", p.constructors)
- continue
- }
-
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case p.cfgPlugin.Has(redis):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- continue
- default:
- // otherwise - error, no local or global config
- p.log.Warn("no global or local redis configuration provided", "key", configKey)
- continue
- }
-
- default:
- p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
+
+ continue
}
return errCh
@@ -220,5 +129,4 @@ func (p *Plugin) Name() string {
}
// Available interface implementation
-func (p *Plugin) Available() {
-}
+func (p *Plugin) Available() {}
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go
index 6d413790..6d413790 100644
--- a/plugins/kv/drivers/memcached/config.go
+++ b/plugins/memcached/config.go
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go
index e24747fe..e24747fe 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/memcached/driver.go
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go
index b2fe59af..191185ae 100644
--- a/plugins/resetter/plugin.go
+++ b/plugins/resetter/plugin.go
@@ -21,7 +21,7 @@ func (p *Plugin) Reset(name string) error {
const op = errors.Op("resetter_plugin_reset_by_name")
svc, ok := p.registry[name]
if !ok {
- return errors.E(op, errors.Errorf("no such service: %s", name))
+ return errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Reset()
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 16e3bd8c..5f5f2df9 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -216,7 +216,7 @@ func (server *Plugin) collectPoolEvents(event interface{}) {
case events.EventMaxMemory:
server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
case events.EventNoFreeWorkers:
- server.log.Warn("no free workers in pool", "error", we.Payload.(error).Error())
+ server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error())
case events.EventPoolError:
server.log.Error("pool error", "error", we.Payload.(error).Error())
case events.EventSupervisorError:
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go
index 9b2a1ca8..9b2a1ca8 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/sqs/config.go
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go
index 17af1caa..dfbda154 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -24,7 +24,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-type JobConsumer struct {
+type consumer struct {
sync.Mutex
pq priorityqueue.Queue
log logger.Logger
@@ -56,7 +56,7 @@ type JobConsumer struct {
pauseCh chan struct{}
}
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no such key - error
@@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
globalCfg.InitDefault()
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
return jb, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no global section
@@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
}
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != jb.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
@@ -243,17 +243,17 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
+ attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: c.queueURL,
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeNameApproximateNumberOfMessages,
types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
@@ -265,13 +265,13 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
out := &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Queue: *c.queueURL,
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -292,28 +292,28 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -323,11 +323,13 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.pauseCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -336,27 +338,27 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop consume
- j.pauseCh <- struct{}{}
+ c.pauseCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -364,28 +366,28 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ c.log.Warn("sqs listener already in the active state")
return
}
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
// increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -393,12 +395,12 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(c.queueURL)
if err != nil {
return err
}
- _, err = j.client.SendMessage(ctx, d)
+ _, err = c.client.SendMessage(ctx, d)
if err != nil {
return err
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go
index df72b2e5..969d8b5b 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/sqs/item.go
@@ -22,6 +22,7 @@ const (
)
var itemAttributes = []string{
+ job.RRID,
job.RRJob,
job.RRDelay,
job.RRPriority,
@@ -184,6 +185,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
QueueUrl: queue,
DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRID: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Ident)},
job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
@@ -192,7 +194,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
+func (c *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -228,6 +230,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
item := &Item{
Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Ident: *msg.MessageAttributes[job.RRID].StringValue,
Payload: *msg.Body,
Headers: h,
Options: &Options{
@@ -236,10 +239,10 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
// private
approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
+ client: c.client,
+ queue: c.queueURL,
receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
+ requeueFn: c.handleItem,
},
}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go
index 9efef90d..215dd6a5 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/sqs/listener.go
@@ -18,22 +18,22 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
+func (c *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
+ case <-c.pauseCh:
+ c.log.Warn("sqs listener stopped")
return
default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
+ message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: c.queueURL,
+ MaxNumberOfMessages: c.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
+ VisibilityTimeout: c.visibilityTimeout,
+ WaitTimeSeconds: c.waitTime,
})
if err != nil {
@@ -42,10 +42,10 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
// in case of NonExistentQueue - recreate the queue
if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags})
if err != nil {
- j.log.Error("create queue", "error", err)
+ c.log.Error("create queue", "error", err)
}
// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to the queues
@@ -60,27 +60,27 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
}
}
- j.log.Error("receive message", "error", err)
+ c.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := j.unpack(&m)
+ item, err := c.unpack(&m)
if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
+ _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: c.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ c.log.Error("message unpack, failed to delete the message from the queue", "error", err)
}
- j.log.Error("message unpack", "error", err)
+ c.log.Error("message unpack", "error", err)
continue
}
- j.pq.Insert(item)
+ c.pq.Insert(item)
}
}
}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go
index 54f61ff5..54f61ff5 100644
--- a/plugins/jobs/drivers/sqs/plugin.go
+++ b/plugins/sqs/plugin.go
diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go
index 82a0fa6c..b76ad0a3 100644
--- a/plugins/status/plugin.go
+++ b/plugins/status/plugin.go
@@ -85,7 +85,7 @@ func (c *Plugin) status(name string) (Status, error) {
const op = errors.Op("checker_plugin_status")
svc, ok := c.statusRegistry[name]
if !ok {
- return Status{}, errors.E(op, errors.Errorf("no such service: %s", name))
+ return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Status(), nil
@@ -96,7 +96,7 @@ func (c *Plugin) ready(name string) (Status, error) {
const op = errors.Op("checker_plugin_ready")
svc, ok := c.readyRegistry[name]
if !ok {
- return Status{}, errors.E(op, errors.Errorf("no such service: %s", name))
+ return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Ready(), nil