diff options
author | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
commit | c7d9385f135853539100430521042f7e7e2ae005 (patch) | |
tree | 588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins | |
parent | f6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff) |
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 4 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 200 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 157 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 144 | ||||
-rw-r--r-- | plugins/boltdb/doc/boltjobs.drawio | 1 | ||||
-rw-r--r-- | plugins/boltdb/doc/job_lifecycle.md | 10 | ||||
-rw-r--r-- | plugins/ephemeral/consumer.go | 119 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 12 | ||||
-rw-r--r-- | plugins/sqs/consumer.go | 84 | ||||
-rw-r--r-- | plugins/sqs/item.go | 8 | ||||
-rw-r--r-- | plugins/sqs/listener.go | 36 |
11 files changed, 607 insertions, 168 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index f1b4d54f..578f36ce 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -420,7 +420,9 @@ func (c *consumer) Resume(_ context.Context, p string) { } func (c *consumer) Stop(context.Context) error { - c.stopCh <- struct{}{} + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } pipe := c.pipeline.Load().(*pipeline.Pipeline) c.eh.Push(events.JobEvent{ diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 67a6d3e7..2492ab60 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -5,10 +5,10 @@ import ( "context" "encoding/gob" "os" + "sync" "sync/atomic" "time" - "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -22,11 +22,12 @@ import ( ) const ( - PluginName = "boltdb" + PluginName string = "boltdb" + rrDB string = "rr.db" - PushBucket = "push" - InQueueBucket = "processing" - DoneBucket = "done" + PushBucket string = "push" + InQueueBucket string = "processing" + DelayBucket string = "delayed" ) type consumer struct { @@ -37,11 +38,16 @@ type consumer struct { db *bolt.DB - log logger.Logger - eh events.Handler - pq priorityqueue.Queue + bPool sync.Pool + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipeline atomic.Value + cond *sync.Cond + listeners uint32 - pipeline atomic.Value + active *uint64 + delayed *uint64 stopCh chan struct{} } @@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) if err != nil { return errors.E(op, upOp) } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } return nil }) + if err != nil { return nil, errors.E(op, err) } @@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e priority: localCfg.Priority, prefetch: localCfg.Prefetch, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + db: db, log: log, eh: e, pq: pq, - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), }, nil } @@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // add default values conf.InitDefaults() - db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{ + db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ Timeout: time.Second * 20, NoGrowSync: false, NoFreelistSync: false, @@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) if err != nil { return errors.E(op, upOp) } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + return nil }) @@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con } return &consumer{ - file: pipeline.String(file, "rr.db"), + file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), prefetch: pipeline.Int(prefetch, 100), permissions: conf.Permissions, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + db: db, log: log, eh: e, pq: pq, - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), }, nil } -func (c *consumer) Push(ctx context.Context, job *job.Job) error { +func (c *consumer) Push(_ context.Context, job *job.Job) error { const op = errors.Op("boltdb_jobs_push") err := c.db.Update(func(tx *bolt.Tx) error { + item := fromJob(job) + + // handle delay + if item.Options.Delay > 0 { + b := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) + + // pool with buffers + buf := c.get() + defer c.put(buf) + + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + + atomic.AddUint64(c.delayed, 1) + + return b.Put(utils.AsBytes(tKey), value) + } + b := tx.Bucket(utils.AsBytes(PushBucket)) - buf := new(bytes.Buffer) + + // pool with buffers + buf := c.get() + defer c.put(buf) + enc := gob.NewEncoder(buf) - err := enc.Encode(job) + err := enc.Encode(item) if err != nil { - return err + return errors.E(op, err) } - return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes()) + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + + // increment active counter + atomic.AddUint64(c.active, 1) + + return b.Put(utils.AsBytes(item.ID()), value) }) if err != nil { @@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } -func (c *consumer) Stop(ctx context.Context) error { +func (c *consumer) Stop(_ context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + } + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) return nil } -func (c *consumer) Pause(ctx context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) { } c.stopCh <- struct{}{} + c.stopCh <- struct{}{} atomic.AddUint32(&c.listeners, ^uint32(0)) @@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) { }) } -func (c *consumer) Resume(ctx context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) { // run listener go c.listener() + go c.delayedJobsListener() // increase number of listeners atomic.AddUint32(&c.listeners, 1) @@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) { }) } -func (c *consumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: PushBucket, + Active: int64(atomic.LoadUint64(c.active)), + Delayed: int64(atomic.LoadUint64(c.delayed)), + Ready: toBool(atomic.LoadUint32(&c.listeners)), + }, nil +} + +// Private + +func (c *consumer) get() *bytes.Buffer { + return c.bPool.Get().(*bytes.Buffer) +} + +func (c *consumer) put(b *bytes.Buffer) { + b.Reset() + c.bPool.Put(b) +} + +func toBool(r uint32) bool { + return r > 0 } diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go index 8a4aefa3..4f02bb43 100644 --- a/plugins/boltdb/boltjobs/item.go +++ b/plugins/boltdb/boltjobs/item.go @@ -1,8 +1,16 @@ package boltjobs import ( + "bytes" + "encoding/gob" + "sync/atomic" + "time" + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" + "go.etcd.io/bbolt" ) type Item struct { @@ -33,6 +41,12 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` + + // private + db *bbolt.DB + + active *uint64 + delayed *uint64 } func (i *Item) ID() string { @@ -65,13 +79,150 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - panic("implement me") + const op = errors.Op("boltdb_item_ack") + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + if i.Options.Delay > 0 { + atomic.AddUint64(i.Options.delayed, ^uint64(0)) + } else { + atomic.AddUint64(i.Options.active, ^uint64(0)) + } + + return tx.Commit() } func (i *Item) Nack() error { - panic("implement me") + const op = errors.Op("boltdb_item_ack") + /* + steps: + 1. begin tx + 2. get item by ID from the InQueueBucket (previously put in the listener) + 3. put it back to the PushBucket + 4. Delete it from the InQueueBucket + */ + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + v := inQb.Get(utils.AsBytes(i.ID())) + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + err = pushB.Put(utils.AsBytes(i.ID()), v) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + return tx.Commit() } func (i *Item) Requeue(headers map[string][]string, delay int64) error { - panic("implement me") + const op = errors.Op("boltdb_item_requeue") + i.Headers = headers + i.Options.Delay = delay + + tx, err := i.Options.db.Begin(true) + if err != nil { + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + if delay > 0 { + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) + + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = delayB.Put(utils.AsBytes(tKey), buf.Bytes()) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() + } + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes()) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() +} + +func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { + i.Options.db = db + i.Options.active = active + i.Options.delayed = delayed +} + +func (i *Item) rollback(err error, tx *bbolt.Tx) error { + errR := tx.Rollback() + if errR != nil { + return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) + } + return errors.Errorf("transaction commit error: %v", err) +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 2ee06088..d184303a 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -1,34 +1,160 @@ package boltjobs import ( - "fmt" + "bytes" + "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" ) func (c *consumer) listener() { - tt := time.NewTicker(time.Second) + tt := time.NewTicker(time.Millisecond * 10) + defer tt.Stop() for { select { case <-c.stopCh: c.log.Warn("boltdb listener stopped") return case <-tt.C: - tx, err := c.db.Begin(false) + if atomic.LoadUint64(c.active) >= uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } + + tx, err := c.db.Begin(true) if err != nil { - panic(err) + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue } b := tx.Bucket(utils.AsBytes(PushBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + // get first item + k, v := b.Cursor().First() + if k == nil && v == nil { + _ = tx.Commit() + continue + } + + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } - cursor := b.Cursor() + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } - k, v := cursor.First() - _ = k - _ = v + // delete key from the PushBucket + err = b.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } - fmt.Println("foo") + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) } } } + +func (c *consumer) delayedJobsListener() { + tt := time.NewTicker(time.Millisecond * 100) + defer tt.Stop() + for { + select { + case <-c.stopCh: + c.log.Warn("boltdb listener stopped") + return + case <-tt.C: + tx, err := c.db.Begin(true) + if err != nil { + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue + } + + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + // get first item + k, v := delayB.Cursor().First() + if k == nil && v == nil { + _ = tx.Commit() + continue + } + + t, err := time.Parse(time.RFC3339, utils.AsString(k)) + if err != nil { + c.rollback(err, tx) + continue + } + + if t.After(time.Now()) { + _ = tx.Commit() + continue + } + + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = delayB.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) + } + } +} + +func (c *consumer) rollback(err error, tx *bolt.Tx) { + errR := tx.Rollback() + if errR != nil { + c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) + } + + c.log.Error("transaction commit error, rollback succeed", "error", err) +} diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio new file mode 100644 index 00000000..feeccae0 --- /dev/null +++ b/plugins/boltdb/doc/boltjobs.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-08-30T08:11:04.405Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="mdoOmMZM5sMC4nWvZBy0" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">ddHBDsIgDADQr+GOEPcDc+rF0w6eyahAwtaFsWz69W4ZiGR6orwWSgrhZTtfnOj1DSVYwqicCT8Rxg7HoliWVZ5BKAuinJHBEtTmBbEw6GgkDFmhR7Te9Dk22HXQ+MyEczjlZQ+0eddeKNhB3Qi717uRXgctKE2JKxilY2sWM62I1QEGLSROX8QrwkuH6LeonUuw6/jiYLZz5z/Zz8scdP7HgSVIdy+b7I949QY=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md new file mode 100644 index 00000000..317aec90 --- /dev/null +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -0,0 +1,10 @@ +### Job lifecycle + +There are several boltdb buckets: + +1. `PushBucket` - used for pushed jobs via RPC. +2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and +get into the `InQueueBucket` waiting to acknowledgement. +3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. + +`` diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go index 91b8eda9..8870bb0f 100644 --- a/plugins/ephemeral/consumer.go +++ b/plugins/ephemeral/consumer.go @@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - _, ok := j.pipeline.Load().(*pipeline.Pipeline) + _, ok := c.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } @@ -105,42 +105,42 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: pipe.Name(), - Active: atomic.LoadInt64(j.active), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil } -func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) return nil } -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop the consumer - j.stopCh <- struct{}{} + c.stopCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -149,24 +149,24 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // listener already active if l == 1 { - j.log.Warn("listener already in the active state") + c.log.Warn("listener already in the active state") return } // resume the consumer on the same channel - j.consume() + c.consume() - atomic.StoreUint32(&j.listeners, 1) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Start: time.Now(), @@ -175,8 +175,8 @@ func (j *consumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - j.eh.Push(events.JobEvent{ +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -185,84 +185,79 @@ func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *consumer) Stop(ctx context.Context) error { - const op = errors.Op("ephemeral_plugin_stop") +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - select { - // return from the consumer - case j.stopCh <- struct{}{}: - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } - return nil + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) - case <-ctx.Done(): - return errors.E(op, ctx.Err()) - } + return nil } -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) // goroutines here. We should limit goroutines here. if msg.Options.Delay > 0 { // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { return errors.E(op, errors.Str("max concurrency number reached")) } go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - atomic.AddInt64(j.delayed, 1) + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired - j.localPrefetch <- jj + c.localPrefetch <- jj - atomic.AddUint64(&j.goroutines, ^uint64(0)) + atomic.AddUint64(&c.goroutines, ^uint64(0)) }(msg) return nil } // increase number of the active jobs - atomic.AddInt64(j.active, 1) + atomic.AddInt64(c.active, 1) // insert to the local, limited pipeline select { - case j.localPrefetch <- msg: + case c.localPrefetch <- msg: return nil case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) } } -func (j *consumer) consume() { +func (c *consumer) consume() { go func() { // redirect for { select { - case item, ok := <-j.localPrefetch: + case item, ok := <-c.localPrefetch: if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue was closed") return } // set requeue channel - item.Options.requeueFn = j.handleItem - item.Options.active = j.active - item.Options.delayed = j.delayed + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed - j.pq.Insert(item) - case <-j.stopCh: + c.pq.Insert(item) + case <-c.stopCh: return } } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index a0b477f9..236aded3 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -177,6 +177,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return true }) + // do not continue processing, immediately stop if channel contains an error + if len(errCh) > 0 { + return errCh + } + var err error p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) if err != nil { @@ -279,6 +284,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Start: start, Elapsed: time.Since(start), }) + + continue } // handle the response protocol @@ -330,6 +337,10 @@ func (p *Plugin) Stop() error { cancel() } + p.Lock() + p.workersPool.Destroy(context.Background()) + p.Unlock() + // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, // but if not, this is not a problem at all. // The main target is to stop the drivers @@ -342,7 +353,6 @@ func (p *Plugin) Stop() error { // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) - return nil } diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index 23203190..dfbda154 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != jb.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } @@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } return nil } -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, + attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: c.queueURL, AttributeNames: []types.QueueAttributeName{ types.QueueAttributeNameApproximateNumberOfMessages, types.QueueAttributeNameApproximateNumberOfMessagesDelayed, @@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) out := &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), + Queue: *c.queueURL, + Ready: ready(atomic.LoadUint32(&c.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *consumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.pauseCh <- struct{}{} + } - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error { return nil } -func (j *consumer) Pause(_ context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop consume - j.pauseCh <- struct{}{} + c.pauseCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + c.log.Warn("sqs listener already in the active state") return } // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) // increase num of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) { }) } -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(c.queueURL) if err != nil { return err } - _, err = j.client.SendMessage(ctx, d) + _, err = c.client.SendMessage(ctx, d) if err != nil { return err } diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go index 996adf6c..4e33e99e 100644 --- a/plugins/sqs/item.go +++ b/plugins/sqs/item.go @@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *consumer) unpack(msg *types.Message) (*Item, error) { +func (c *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -236,10 +236,10 @@ func (j *consumer) unpack(msg *types.Message) (*Item, error) { // private approxReceiveCount: int64(recCount), - client: j.client, - queue: j.queueURL, + client: c.client, + queue: c.queueURL, receiptHandler: msg.ReceiptHandle, - requeueFn: j.handleItem, + requeueFn: c.handleItem, }, } diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go index a4280af2..215dd6a5 100644 --- a/plugins/sqs/listener.go +++ b/plugins/sqs/listener.go @@ -18,22 +18,22 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *consumer) listen(ctx context.Context) { //nolint:gocognit +func (c *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { - case <-j.pauseCh: - j.log.Warn("sqs listener stopped") + case <-c.pauseCh: + c.log.Warn("sqs listener stopped") return default: - message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ - QueueUrl: j.queueURL, - MaxNumberOfMessages: j.prefetch, + message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: c.queueURL, + MaxNumberOfMessages: c.prefetch, AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, MessageAttributeNames: []string{All}, // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - VisibilityTimeout: j.visibilityTimeout, - WaitTimeSeconds: j.waitTime, + VisibilityTimeout: c.visibilityTimeout, + WaitTimeSeconds: c.waitTime, }) if err != nil { @@ -42,10 +42,10 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { // in case of NonExistentQueue - recreate the queue if apiErr.Code == NonExistentQueue { - j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) - _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags}) if err != nil { - j.log.Error("create queue", "error", err) + c.log.Error("create queue", "error", err) } // To successfully create a new queue, you must provide a // queue name that adheres to the limits related to the queues @@ -60,27 +60,27 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit } } - j.log.Error("receive message", "error", err) + c.log.Error("receive message", "error", err) continue } for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := j.unpack(&m) + item, err := c.unpack(&m) if err != nil { - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, + _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: c.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + c.log.Error("message unpack, failed to delete the message from the queue", "error", err) } - j.log.Error("message unpack", "error", err) + c.log.Error("message unpack", "error", err) continue } - j.pq.Insert(item) + c.pq.Insert(item) } } } |