summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
committerValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
commitc7d9385f135853539100430521042f7e7e2ae005 (patch)
tree588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins
parentf6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff)
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go4
-rw-r--r--plugins/boltdb/boltjobs/consumer.go200
-rw-r--r--plugins/boltdb/boltjobs/item.go157
-rw-r--r--plugins/boltdb/boltjobs/listener.go144
-rw-r--r--plugins/boltdb/doc/boltjobs.drawio1
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md10
-rw-r--r--plugins/ephemeral/consumer.go119
-rw-r--r--plugins/jobs/plugin.go12
-rw-r--r--plugins/sqs/consumer.go84
-rw-r--r--plugins/sqs/item.go8
-rw-r--r--plugins/sqs/listener.go36
11 files changed, 607 insertions, 168 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index f1b4d54f..578f36ce 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -420,7 +420,9 @@ func (c *consumer) Resume(_ context.Context, p string) {
}
func (c *consumer) Stop(context.Context) error {
- c.stopCh <- struct{}{}
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
pipe := c.pipeline.Load().(*pipeline.Pipeline)
c.eh.Push(events.JobEvent{
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 67a6d3e7..2492ab60 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -5,10 +5,10 @@ import (
"context"
"encoding/gob"
"os"
+ "sync"
"sync/atomic"
"time"
- "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -22,11 +22,12 @@ import (
)
const (
- PluginName = "boltdb"
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
- PushBucket = "push"
- InQueueBucket = "processing"
- DoneBucket = "done"
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
)
type consumer struct {
@@ -37,11 +38,16 @@ type consumer struct {
db *bolt.DB
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
listeners uint32
- pipeline atomic.Value
+ active *uint64
+ delayed *uint64
stopCh chan struct{}
}
@@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
return nil
})
+
if err != nil {
return nil, errors.E(op, err)
}
@@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
priority: localCfg.Priority,
prefetch: localCfg.Prefetch,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
@@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// add default values
conf.InitDefaults()
- db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
return nil
})
@@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
}
return &consumer{
- file: pipeline.String(file, "rr.db"),
+ file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
prefetch: pipeline.Int(prefetch, 100),
permissions: conf.Permissions,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
-func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return b.Put(utils.AsBytes(tKey), value)
+ }
+
b := tx.Bucket(utils.AsBytes(PushBucket))
- buf := new(bytes.Buffer)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
enc := gob.NewEncoder(buf)
- err := enc.Encode(job)
+ err := enc.Encode(item)
if err != nil {
- return err
+ return errors.E(op, err)
}
- return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return b.Put(utils.AsBytes(item.ID()), value)
})
if err != nil {
@@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
-func (c *consumer) Stop(ctx context.Context) error {
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
-func (c *consumer) Pause(ctx context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
}
c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
atomic.AddUint32(&c.listeners, ^uint32(0))
@@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
})
}
-func (c *consumer) Resume(ctx context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) {
// run listener
go c.listener()
+ go c.delayedJobsListener()
// increase number of listeners
atomic.AddUint32(&c.listeners, 1)
@@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) {
})
}
-func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
index 8a4aefa3..4f02bb43 100644
--- a/plugins/boltdb/boltjobs/item.go
+++ b/plugins/boltdb/boltjobs/item.go
@@ -1,8 +1,16 @@
package boltjobs
import (
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
+ "time"
+
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
+ "go.etcd.io/bbolt"
)
type Item struct {
@@ -33,6 +41,12 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
+
+ // private
+ db *bbolt.DB
+
+ active *uint64
+ delayed *uint64
}
func (i *Item) ID() string {
@@ -65,13 +79,150 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ if i.Options.Delay > 0 {
+ atomic.AddUint64(i.Options.delayed, ^uint64(0))
+ } else {
+ atomic.AddUint64(i.Options.active, ^uint64(0))
+ }
+
+ return tx.Commit()
}
func (i *Item) Nack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ /*
+ steps:
+ 1. begin tx
+ 2. get item by ID from the InQueueBucket (previously put in the listener)
+ 3. put it back to the PushBucket
+ 4. Delete it from the InQueueBucket
+ */
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ v := inQb.Get(utils.AsBytes(i.ID()))
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ err = pushB.Put(utils.AsBytes(i.ID()), v)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ return tx.Commit()
}
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_requeue")
+ i.Headers = headers
+ i.Options.Delay = delay
+
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ if delay > 0 {
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = delayB.Put(utils.AsBytes(tKey), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+ }
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
+ i.Options.db = db
+ i.Options.active = active
+ i.Options.delayed = delayed
+}
+
+func (i *Item) rollback(err error, tx *bbolt.Tx) error {
+ errR := tx.Rollback()
+ if errR != nil {
+ return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR)
+ }
+ return errors.Errorf("transaction commit error: %v", err)
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 2ee06088..d184303a 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -1,34 +1,160 @@
package boltjobs
import (
- "fmt"
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
)
func (c *consumer) listener() {
- tt := time.NewTicker(time.Second)
+ tt := time.NewTicker(time.Millisecond * 10)
+ defer tt.Stop()
for {
select {
case <-c.stopCh:
c.log.Warn("boltdb listener stopped")
return
case <-tt.C:
- tx, err := c.db.Begin(false)
+ if atomic.LoadUint64(c.active) >= uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
+
+ tx, err := c.db.Begin(true)
if err != nil {
- panic(err)
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
}
b := tx.Bucket(utils.AsBytes(PushBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := b.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- cursor := b.Cursor()
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- k, v := cursor.First()
- _ = k
- _ = v
+ // delete key from the PushBucket
+ err = b.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- fmt.Println("foo")
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
}
}
}
+
+func (c *consumer) delayedJobsListener() {
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Warn("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := delayB.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ t, err := time.Parse(time.RFC3339, utils.AsString(k))
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ if t.After(time.Now()) {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+ }
+}
+
+func (c *consumer) rollback(err error, tx *bolt.Tx) {
+ errR := tx.Rollback()
+ if errR != nil {
+ c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR)
+ }
+
+ c.log.Error("transaction commit error, rollback succeed", "error", err)
+}
diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio
new file mode 100644
index 00000000..feeccae0
--- /dev/null
+++ b/plugins/boltdb/doc/boltjobs.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-08-30T08:11:04.405Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="mdoOmMZM5sMC4nWvZBy0" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">ddHBDsIgDADQr+GOEPcDc+rF0w6eyahAwtaFsWz69W4ZiGR6orwWSgrhZTtfnOj1DSVYwqicCT8Rxg7HoliWVZ5BKAuinJHBEtTmBbEw6GgkDFmhR7Te9Dk22HXQ+MyEczjlZQ+0eddeKNhB3Qi717uRXgctKE2JKxilY2sWM62I1QEGLSROX8QrwkuH6LeonUuw6/jiYLZz5z/Zz8scdP7HgSVIdy+b7I949QY=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
new file mode 100644
index 00000000..317aec90
--- /dev/null
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -0,0 +1,10 @@
+### Job lifecycle
+
+There are several boltdb buckets:
+
+1. `PushBucket` - used for pushed jobs via RPC.
+2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and
+get into the `InQueueBucket` waiting to acknowledgement.
+3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
+
+``
diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index 91b8eda9..8870bb0f 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
@@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- _, ok := j.pipeline.Load().(*pipeline.Pipeline)
+ _, ok := c.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
@@ -105,42 +105,42 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *consumer) State(_ context.Context) (*jobState.State, error) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: pipe.Name(),
- Active: atomic.LoadInt64(j.active),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Active: atomic.LoadInt64(c.active),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
}
-func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop the consumer
- j.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -149,24 +149,24 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// listener already active
if l == 1 {
- j.log.Warn("listener already in the active state")
+ c.log.Warn("listener already in the active state")
return
}
// resume the consumer on the same channel
- j.consume()
+ c.consume()
- atomic.StoreUint32(&j.listeners, 1)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -175,8 +175,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- j.eh.Push(events.JobEvent{
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -185,84 +185,79 @@ func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(ctx context.Context) error {
- const op = errors.Op("ephemeral_plugin_stop")
+func (c *consumer) Stop(_ context.Context) error {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- select {
- // return from the consumer
- case j.stopCh <- struct{}{}:
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
- return nil
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
- case <-ctx.Done():
- return errors.E(op, ctx.Err())
- }
+ return nil
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
// goroutines here. We should limit goroutines here.
if msg.Options.Delay > 0 {
// if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ if atomic.LoadUint64(&c.goroutines) >= goroutinesMax {
return errors.E(op, errors.Str("max concurrency number reached"))
}
go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddUint64(&c.goroutines, 1)
+ atomic.AddInt64(c.delayed, 1)
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localPrefetch <- jj
+ c.localPrefetch <- jj
- atomic.AddUint64(&j.goroutines, ^uint64(0))
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
}(msg)
return nil
}
// increase number of the active jobs
- atomic.AddInt64(j.active, 1)
+ atomic.AddInt64(c.active, 1)
// insert to the local, limited pipeline
select {
- case j.localPrefetch <- msg:
+ case c.localPrefetch <- msg:
return nil
case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err()))
}
}
-func (j *consumer) consume() {
+func (c *consumer) consume() {
go func() {
// redirect
for {
select {
- case item, ok := <-j.localPrefetch:
+ case item, ok := <-c.localPrefetch:
if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue was closed")
return
}
// set requeue channel
- item.Options.requeueFn = j.handleItem
- item.Options.active = j.active
- item.Options.delayed = j.delayed
+ item.Options.requeueFn = c.handleItem
+ item.Options.active = c.active
+ item.Options.delayed = c.delayed
- j.pq.Insert(item)
- case <-j.stopCh:
+ c.pq.Insert(item)
+ case <-c.stopCh:
return
}
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index a0b477f9..236aded3 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -177,6 +177,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return true
})
+ // do not continue processing, immediately stop if channel contains an error
+ if len(errCh) > 0 {
+ return errCh
+ }
+
var err error
p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
@@ -279,6 +284,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
+ continue
}
// handle the response protocol
@@ -330,6 +337,10 @@ func (p *Plugin) Stop() error {
cancel()
}
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
// The main target is to stop the drivers
@@ -342,7 +353,6 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
-
return nil
}
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..dfbda154 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != jb.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
@@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
+ attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: c.queueURL,
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeNameApproximateNumberOfMessages,
types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
@@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
out := &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Queue: *c.queueURL,
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.pauseCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error {
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop consume
- j.pauseCh <- struct{}{}
+ c.pauseCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ c.log.Warn("sqs listener already in the active state")
return
}
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
// increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(c.queueURL)
if err != nil {
return err
}
- _, err = j.client.SendMessage(ctx, d)
+ _, err = c.client.SendMessage(ctx, d)
if err != nil {
return err
}
diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go
index 996adf6c..4e33e99e 100644
--- a/plugins/sqs/item.go
+++ b/plugins/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *consumer) unpack(msg *types.Message) (*Item, error) {
+func (c *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -236,10 +236,10 @@ func (j *consumer) unpack(msg *types.Message) (*Item, error) {
// private
approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
+ client: c.client,
+ queue: c.queueURL,
receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
+ requeueFn: c.handleItem,
},
}
diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go
index a4280af2..215dd6a5 100644
--- a/plugins/sqs/listener.go
+++ b/plugins/sqs/listener.go
@@ -18,22 +18,22 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
+func (c *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
+ case <-c.pauseCh:
+ c.log.Warn("sqs listener stopped")
return
default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
+ message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: c.queueURL,
+ MaxNumberOfMessages: c.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
+ VisibilityTimeout: c.visibilityTimeout,
+ WaitTimeSeconds: c.waitTime,
})
if err != nil {
@@ -42,10 +42,10 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
// in case of NonExistentQueue - recreate the queue
if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags})
if err != nil {
- j.log.Error("create queue", "error", err)
+ c.log.Error("create queue", "error", err)
}
// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to the queues
@@ -60,27 +60,27 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
}
}
- j.log.Error("receive message", "error", err)
+ c.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := j.unpack(&m)
+ item, err := c.unpack(&m)
if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
+ _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: c.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ c.log.Error("message unpack, failed to delete the message from the queue", "error", err)
}
- j.log.Error("message unpack", "error", err)
+ c.log.Error("message unpack", "error", err)
continue
}
- j.pq.Insert(item)
+ c.pq.Insert(item)
}
}
}