summaryrefslogtreecommitdiff
path: root/plugins/boltdb
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
committerValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
commitc7d9385f135853539100430521042f7e7e2ae005 (patch)
tree588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins/boltdb
parentf6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff)
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/boltdb')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go200
-rw-r--r--plugins/boltdb/boltjobs/item.go157
-rw-r--r--plugins/boltdb/boltjobs/listener.go144
-rw-r--r--plugins/boltdb/doc/boltjobs.drawio1
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md10
5 files changed, 471 insertions, 41 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 67a6d3e7..2492ab60 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -5,10 +5,10 @@ import (
"context"
"encoding/gob"
"os"
+ "sync"
"sync/atomic"
"time"
- "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -22,11 +22,12 @@ import (
)
const (
- PluginName = "boltdb"
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
- PushBucket = "push"
- InQueueBucket = "processing"
- DoneBucket = "done"
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
)
type consumer struct {
@@ -37,11 +38,16 @@ type consumer struct {
db *bolt.DB
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
listeners uint32
- pipeline atomic.Value
+ active *uint64
+ delayed *uint64
stopCh chan struct{}
}
@@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
return nil
})
+
if err != nil {
return nil, errors.E(op, err)
}
@@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
priority: localCfg.Priority,
prefetch: localCfg.Prefetch,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
@@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// add default values
conf.InitDefaults()
- db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
return nil
})
@@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
}
return &consumer{
- file: pipeline.String(file, "rr.db"),
+ file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
prefetch: pipeline.Int(prefetch, 100),
permissions: conf.Permissions,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
-func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return b.Put(utils.AsBytes(tKey), value)
+ }
+
b := tx.Bucket(utils.AsBytes(PushBucket))
- buf := new(bytes.Buffer)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
enc := gob.NewEncoder(buf)
- err := enc.Encode(job)
+ err := enc.Encode(item)
if err != nil {
- return err
+ return errors.E(op, err)
}
- return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return b.Put(utils.AsBytes(item.ID()), value)
})
if err != nil {
@@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
-func (c *consumer) Stop(ctx context.Context) error {
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
-func (c *consumer) Pause(ctx context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
}
c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
atomic.AddUint32(&c.listeners, ^uint32(0))
@@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
})
}
-func (c *consumer) Resume(ctx context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) {
// run listener
go c.listener()
+ go c.delayedJobsListener()
// increase number of listeners
atomic.AddUint32(&c.listeners, 1)
@@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) {
})
}
-func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
index 8a4aefa3..4f02bb43 100644
--- a/plugins/boltdb/boltjobs/item.go
+++ b/plugins/boltdb/boltjobs/item.go
@@ -1,8 +1,16 @@
package boltjobs
import (
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
+ "time"
+
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
+ "go.etcd.io/bbolt"
)
type Item struct {
@@ -33,6 +41,12 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
+
+ // private
+ db *bbolt.DB
+
+ active *uint64
+ delayed *uint64
}
func (i *Item) ID() string {
@@ -65,13 +79,150 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ if i.Options.Delay > 0 {
+ atomic.AddUint64(i.Options.delayed, ^uint64(0))
+ } else {
+ atomic.AddUint64(i.Options.active, ^uint64(0))
+ }
+
+ return tx.Commit()
}
func (i *Item) Nack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ /*
+ steps:
+ 1. begin tx
+ 2. get item by ID from the InQueueBucket (previously put in the listener)
+ 3. put it back to the PushBucket
+ 4. Delete it from the InQueueBucket
+ */
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ v := inQb.Get(utils.AsBytes(i.ID()))
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ err = pushB.Put(utils.AsBytes(i.ID()), v)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ return tx.Commit()
}
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_requeue")
+ i.Headers = headers
+ i.Options.Delay = delay
+
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ if delay > 0 {
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = delayB.Put(utils.AsBytes(tKey), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+ }
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
+ i.Options.db = db
+ i.Options.active = active
+ i.Options.delayed = delayed
+}
+
+func (i *Item) rollback(err error, tx *bbolt.Tx) error {
+ errR := tx.Rollback()
+ if errR != nil {
+ return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR)
+ }
+ return errors.Errorf("transaction commit error: %v", err)
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 2ee06088..d184303a 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -1,34 +1,160 @@
package boltjobs
import (
- "fmt"
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
)
func (c *consumer) listener() {
- tt := time.NewTicker(time.Second)
+ tt := time.NewTicker(time.Millisecond * 10)
+ defer tt.Stop()
for {
select {
case <-c.stopCh:
c.log.Warn("boltdb listener stopped")
return
case <-tt.C:
- tx, err := c.db.Begin(false)
+ if atomic.LoadUint64(c.active) >= uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
+
+ tx, err := c.db.Begin(true)
if err != nil {
- panic(err)
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
}
b := tx.Bucket(utils.AsBytes(PushBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := b.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- cursor := b.Cursor()
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- k, v := cursor.First()
- _ = k
- _ = v
+ // delete key from the PushBucket
+ err = b.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- fmt.Println("foo")
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
}
}
}
+
+func (c *consumer) delayedJobsListener() {
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Warn("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := delayB.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ t, err := time.Parse(time.RFC3339, utils.AsString(k))
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ if t.After(time.Now()) {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+ }
+}
+
+func (c *consumer) rollback(err error, tx *bolt.Tx) {
+ errR := tx.Rollback()
+ if errR != nil {
+ c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR)
+ }
+
+ c.log.Error("transaction commit error, rollback succeed", "error", err)
+}
diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio
new file mode 100644
index 00000000..feeccae0
--- /dev/null
+++ b/plugins/boltdb/doc/boltjobs.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-08-30T08:11:04.405Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="mdoOmMZM5sMC4nWvZBy0" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">ddHBDsIgDADQr+GOEPcDc+rF0w6eyahAwtaFsWz69W4ZiGR6orwWSgrhZTtfnOj1DSVYwqicCT8Rxg7HoliWVZ5BKAuinJHBEtTmBbEw6GgkDFmhR7Te9Dk22HXQ+MyEczjlZQ+0eddeKNhB3Qi717uRXgctKE2JKxilY2sWM62I1QEGLSROX8QrwkuH6LeonUuw6/jiYLZz5z/Zz8scdP7HgSVIdy+b7I949QY=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
new file mode 100644
index 00000000..317aec90
--- /dev/null
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -0,0 +1,10 @@
+### Job lifecycle
+
+There are several boltdb buckets:
+
+1. `PushBucket` - used for pushed jobs via RPC.
+2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and
+get into the `InQueueBucket` waiting to acknowledgement.
+3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
+
+``