summaryrefslogtreecommitdiff
path: root/plugins/boltdb
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb')
-rw-r--r--plugins/boltdb/boltjobs/config.go39
-rw-r--r--plugins/boltdb/boltjobs/consumer.go422
-rw-r--r--plugins/boltdb/boltjobs/item.go229
-rw-r--r--plugins/boltdb/boltjobs/listener.go151
-rw-r--r--plugins/boltdb/boltkv/config.go30
-rw-r--r--plugins/boltdb/boltkv/driver.go468
-rw-r--r--plugins/boltdb/doc/boltjobs.drawio1
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md10
-rw-r--r--plugins/boltdb/plugin.go82
9 files changed, 1432 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go
new file mode 100644
index 00000000..8cc098c1
--- /dev/null
+++ b/plugins/boltdb/boltjobs/config.go
@@ -0,0 +1,39 @@
+package boltjobs
+
+const (
+ file string = "file"
+ priority string = "priority"
+ prefetch string = "prefetch"
+)
+
+type GlobalCfg struct {
+ // db file permissions
+ Permissions int `mapstructure:"permissions"`
+ // consume timeout
+}
+
+func (c *GlobalCfg) InitDefaults() {
+ if c.Permissions == 0 {
+ c.Permissions = 0777
+ }
+}
+
+type Config struct {
+ File string `mapstructure:"file"`
+ Priority int `mapstructure:"priority"`
+ Prefetch int `mapstructure:"prefetch"`
+}
+
+func (c *Config) InitDefaults() {
+ if c.File == "" {
+ c.File = "rr.db"
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+
+ if c.Prefetch == 0 {
+ c.Prefetch = 1000
+ }
+}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
new file mode 100644
index 00000000..ed0eda61
--- /dev/null
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -0,0 +1,422 @@
+package boltjobs
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+const (
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
+
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
+)
+
+type consumer struct {
+ file string
+ permissions int
+ priority int
+ prefetch int
+
+ db *bolt.DB
+
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
+ listeners uint32
+ active *uint64
+ delayed *uint64
+
+ stopCh chan struct{}
+}
+
+func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg := &Config{}
+ err = cfg.UnmarshalKey(configKey, localCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg.InitDefaults()
+ conf.InitDefaults()
+
+ db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
+ permissions: conf.Permissions,
+ file: localCfg.File,
+ priority: localCfg.Priority,
+ prefetch: localCfg.Prefetch,
+
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 2),
+ }, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
+ file: pipeline.String(file, rrDB),
+ priority: pipeline.Int(priority, 10),
+ prefetch: pipeline.Int(prefetch, 100),
+ permissions: conf.Permissions,
+
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 2),
+ }, nil
+}
+
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
+ const op = errors.Op("boltdb_jobs_push")
+ err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+ // pool with buffers
+ buf := c.get()
+ // encode the job
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ c.put(buf)
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+ c.put(buf)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ err = b.Put(utils.AsBytes(tKey), value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return nil
+ }
+
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ err = b.Put(utils.AsBytes(item.ID()), value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return nil
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
+ return nil
+}
+
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("boltdb_run")
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 1 {
+ c.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
new file mode 100644
index 00000000..837f8c63
--- /dev/null
+++ b/plugins/boltdb/boltjobs/item.go
@@ -0,0 +1,229 @@
+package boltjobs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+ "go.etcd.io/bbolt"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ db *bbolt.DB
+ active *uint64
+ delayed *uint64
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ const op = errors.Op("boltdb_item_ack")
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ if i.Options.Delay > 0 {
+ atomic.AddUint64(i.Options.delayed, ^uint64(0))
+ } else {
+ atomic.AddUint64(i.Options.active, ^uint64(0))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) Nack() error {
+ const op = errors.Op("boltdb_item_ack")
+ /*
+ steps:
+ 1. begin tx
+ 2. get item by ID from the InQueueBucket (previously put in the listener)
+ 3. put it back to the PushBucket
+ 4. Delete it from the InQueueBucket
+ */
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ v := inQb.Get(utils.AsBytes(i.ID()))
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ err = pushB.Put(utils.AsBytes(i.ID()), v)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ return tx.Commit()
+}
+
+/*
+Requeue algorithm:
+1. Rewrite item headers and delay.
+2. Begin writable transaction on attached to the item db.
+3. Delete item from the InQueueBucket
+4. Handle items with the delay:
+ 4.1. Get DelayBucket
+ 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format
+ 4.3. Put this key with value to the DelayBucket
+5. W/o delay, put the key with value to the PushBucket (requeue)
+*/
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ const op = errors.Op("boltdb_item_requeue")
+ i.Headers = headers
+ i.Options.Delay = delay
+
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ // encode the item
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ val := make([]byte, buf.Len())
+ copy(val, buf.Bytes())
+ buf.Reset()
+
+ if delay > 0 {
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
+
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = delayB.Put(utils.AsBytes(tKey), val)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+ }
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = pushB.Put(utils.AsBytes(i.ID()), val)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
+ i.Options.db = db
+ i.Options.active = active
+ i.Options.delayed = delayed
+}
+
+func (i *Item) rollback(err error, tx *bbolt.Tx) error {
+ errR := tx.Rollback()
+ if errR != nil {
+ return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR)
+ }
+ return errors.Errorf("transaction commit error: %v", err)
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
new file mode 100644
index 00000000..7c161555
--- /dev/null
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -0,0 +1,151 @@
+package boltjobs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+func (c *consumer) listener() {
+ tt := time.NewTicker(time.Millisecond)
+ defer tt.Stop()
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Info("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := b.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = b.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+ }
+}
+
+func (c *consumer) delayedJobsListener() {
+ tt := time.NewTicker(time.Second)
+ defer tt.Stop()
+
+ // just some 90's
+ loc, err := time.LoadLocation("UTC")
+ if err != nil {
+ c.log.Error("failed to load location, delayed jobs won't work", "error", err)
+ return
+ }
+
+ var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339))
+
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Info("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ cursor := delayB.Cursor()
+ endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339))
+
+ for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() {
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+ }
+ }
+}
+
+func (c *consumer) rollback(err error, tx *bolt.Tx) {
+ errR := tx.Rollback()
+ if errR != nil {
+ c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR)
+ }
+
+ c.log.Error("transaction commit error, rollback succeed", "error", err)
+}
diff --git a/plugins/boltdb/boltkv/config.go b/plugins/boltdb/boltkv/config.go
new file mode 100644
index 00000000..56d00674
--- /dev/null
+++ b/plugins/boltdb/boltkv/config.go
@@ -0,0 +1,30 @@
+package boltkv
+
+type Config struct {
+ // File is boltDB file. No need to create it by your own,
+ // boltdb driver is able to create the file, or read existing
+ File string
+ // Bucket to store data in boltDB
+ bucket string
+ // db file permissions
+ Permissions int
+ // timeout
+ Interval int `mapstructure:"interval"`
+}
+
+// InitDefaults initializes default values for the boltdb
+func (s *Config) InitDefaults() {
+ s.bucket = "default"
+
+ if s.File == "" {
+ s.File = "rr.db" // default file name
+ }
+
+ if s.Permissions == 0 {
+ s.Permissions = 0777 // free for all
+ }
+
+ if s.Interval == 0 {
+ s.Interval = 60 // default is 60 seconds timeout
+ }
+}
diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go
new file mode 100644
index 00000000..ba1450cd
--- /dev/null
+++ b/plugins/boltdb/boltkv/driver.go
@@ -0,0 +1,468 @@
+package boltkv
+
+import (
+ "bytes"
+ "encoding/gob"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+const (
+ RootPluginName string = "kv"
+)
+
+type Driver struct {
+ clearMu sync.RWMutex
+ // db instance
+ DB *bolt.DB
+ // name should be UTF-8
+ bucket []byte
+ log logger.Logger
+ cfg *Config
+
+ // gc contains keys with timeouts
+ gc sync.Map
+ // default timeout for cache cleanup is 1 minute
+ timeout time.Duration
+
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+}
+
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ if !cfgPlugin.Has(RootPluginName) {
+ return nil, errors.E(op, errors.Str("no kv section in the configuration"))
+ }
+
+ d := &Driver{
+ log: log,
+ stop: stop,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ d.cfg.InitDefaults()
+
+ d.bucket = []byte(d.cfg.bucket)
+ d.timeout = time.Duration(d.cfg.Interval) * time.Second
+ d.gc = sync.Map{}
+
+ db, err := bolt.Open(d.cfg.File, os.FileMode(d.cfg.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.DB = db
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ go d.startGCLoop()
+
+ return d, nil
+}
+
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb_driver_has")
+ d.log.Debug("boltdb HAS method called", "args", keys)
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+
+ // this is readable transaction
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ // Get retrieves the value for a key in the bucket.
+ // Returns a nil value if the key does not exist or if the key is a nested bucket.
+ // The returned value is only valid for the life of the transaction.
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ exist := b.Get([]byte(keys[i]))
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.log.Debug("boltdb HAS method finished")
+ return m, nil
+}
+
+// Get retrieves the value for a key in the bucket.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
+// The returned value is only valid for the life of the transaction.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("boltdb_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ var val []byte
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ val = b.Get([]byte(key))
+
+ // try to decode values
+ if val != nil {
+ buf := bytes.NewReader(val)
+ decoder := gob.NewDecoder(buf)
+
+ var i string
+ err := decoder.Decode(&i)
+ if err != nil {
+ // unsafe (w/o runes) convert
+ return errors.E(op, err)
+ }
+
+ // set the value
+ val = utils.AsBytes(i)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
+
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
+ const op = errors.Op("boltdb_driver_mget")
+ // defense
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string][]byte, len(keys))
+
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ buf := new(bytes.Buffer)
+ var out []byte
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
+ if value != nil {
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = nil
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return m, nil
+}
+
+// Set puts the K/V to the bolt
+func (d *Driver) Set(items ...*kvv1.Item) error {
+ const op = errors.Op("boltdb_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ // use access by index to avoid copying
+ for i := range items {
+ // performance note: pass a prepared bytes slice with initial cap
+ // we can't move buf and gob out of loop, because we need to clear both from data
+ // but gob will contain (w/o re-init) the past data
+ buf := new(bytes.Buffer)
+ encoder := gob.NewEncoder(buf)
+ if errors.Is(errors.EmptyItem, err) {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
+ err = b.Put([]byte(items[i].Key), buf.Bytes())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
+ // we do not need mutex here, since we use sync.Map
+ if items[i].Timeout != "" {
+ // check correctness of provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // Store key TTL in the separate map
+ d.gc.Store(items[i].Key, items[i].Timeout)
+ }
+
+ buf.Reset()
+ }
+
+ return nil
+}
+
+// Delete all keys from DB
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("boltdb_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ err = b.Delete([]byte(key))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
+ const op = errors.Op("boltdb_driver_mexpire")
+ for i := range items {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ d.gc.Store(items[i].Key, items[i].Timeout)
+ }
+ return nil
+}
+
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
+ const op = errors.Op("boltdb_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]string, len(keys))
+
+ for i := range keys {
+ if item, ok := d.gc.Load(keys[i]); ok {
+ // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
+ m[keys[i]] = item.(string)
+ }
+ }
+ return m, nil
+}
+
+func (d *Driver) Clear() error {
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ err := tx.DeleteBucket(d.bucket)
+ if err != nil {
+ d.log.Error("boltdb delete bucket", "error", err)
+ return err
+ }
+
+ _, err = tx.CreateBucket(d.bucket)
+ if err != nil {
+ d.log.Error("boltdb create bucket", "error", err)
+ return err
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ d.log.Error("clear transaction failed", "error", err)
+ return err
+ }
+
+ d.clearMu.Lock()
+ d.gc = sync.Map{}
+ d.clearMu.Unlock()
+
+ return nil
+}
+
+// ========================= PRIVATE =================================
+
+func (d *Driver) startGCLoop() { //nolint:gocognit
+ go func() {
+ t := time.NewTicker(d.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ d.clearMu.RLock()
+
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ d.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("boltdb_plugin_gc")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
+
+ if now.After(v) {
+ // time expired
+ d.gc.Delete(k)
+ d.log.Debug("key deleted", "key", k)
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete(utils.AsBytes(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ d.log.Error("error during the gc phase of update", "error", err)
+ return false
+ }
+ }
+ return true
+ })
+
+ d.clearMu.RUnlock()
+ case <-d.stop:
+ err := d.DB.Close()
+ if err != nil {
+ d.log.Error("error")
+ }
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio
new file mode 100644
index 00000000..7d1f3531
--- /dev/null
+++ b/plugins/boltdb/doc/boltjobs.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-08-31T09:34:11.357Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="KiNZAPNeIcd5kV3EE5lF" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">7V1bc5s4GP01nmkfkgHE9TF2km13up20me1uH2WQbRJsuYATe3/9ijtICiHmIrvBnWmNEAjrfNejT3QCZuv9Hz7crv7CDvImiuTsJ+B6oigKABr5J2o5JC0yUJWkZem7TtpWNNy7/6G0UUpbd66DgkrHEGMvdLfVRhtvNsgOK23Q9/FztdsCe9VRt3CJmIZ7G3ps6z+uE67SVl2SihOfkLtcZUMr2Zk1zHqnDcEKOvi51ARuJmDmYxwm39b7GfKi6csmJrnu9oWz+ZP5aBM2ucDcf0ZP8pX6w3iw/K8/fijw1/VFikYQHrJfjBwyAekh9sMVXuIN9G6K1qmPdxsHRXeVyFHR5wvGW9Iok8YHFIaHFE24CzFpWoVrLz1LHtg//Btdf6llhz/T28UH1/vK0SE9WuBNeAvXrhc1zGBgQ8eF5OlnRO7S0+mYMiDHyU+Lfs+LM5aJFPSXKKyZJjnHi4g6wmtEHotc5yMPhu5T9f4wFbll3i+/9A67ZGRFSvVDV1PROORiVb1F8lzpVWVoX7uRSt0owDvfRsyNyJfS7ymaYsl5gxRl2jqwGO3dsCRF5Ohn6UwhQ9HBoSxQpyR6CTKviV4DEVVbimhFJt4sAMl9n6C3S0e6+/v+E2n5fjdjRKMK/PPKDdH9FsaT8EzcSBXkN078wvW8GfawHw8F5tA2nag9CH38iEpnFKCqmpND9YT8EO3rwWInN71AM6vqp2R6/FxyGZkbWJW8Ba2mneFhiNRHuaSNhW7y9VGcXoGGeqWJ1CvzXHAs7GrFqhZG9uTsalP8hdpVa8RfNP66SPwB41enJPm5nrb0qVUnuSAf0+Q5SaADCzjHwtTKp+pVl6qZGutSZY5LBX25VFlmJv1EdVGcTrW1lfzsxqDCK7VhmtRVdqOywe0uWJGW6c5+RGG3uqhFf3i6qMefFL+yjsYfITqqaBQwoGHcSwPYmZIqQh1mNRGVapX0pUQ050dqHGZNwmM7aG7OubacfG5vX5eTDs2Bfg4hlgxOR2Qachfy5ExiLFk5hyBL1hgDP8PbaPwHPOcKxxc4R14VUOi5yw35bpNJQkTlppFldW3oXaUn1q7jJLKDAvc/OI/vF8GzjfxW/Ju06US75ul4E43lWPKUFE/HmuRMdBnKGp140e4TkbOUdNJa+vYLWbIqPuRCoXwDXiwC1A9rqYvVfKO56tMS0RONKUL1ZaOXoJEkB1QGMXDUmE0AGzZ6bhCiDTESivRBltbBx24jyAXSbZsXATiGNZeOdgitIkUmhOdFiuqQkaLMRvXyZTwHheWXiO6Sv1aI/P15822HdigL+XWPPOB07ldw03/tosW8eAIvgngGr0gHGWz38TRm58m3ZfSvEg14jTwURiMsfLzOh4tE5YX0gqAQ8n2PhxZhO88TmasghZ4RoQ3eIDHSQ5EBwGSlJ5eUQfh1WSgx+5ag8Qw9h9HUc1jDeA59aM9hsJ7j010k7th/RH4QzSvGXkuvQckFE79SbgXJjoYMnluxdANAvamItDIEskQtdGuG6JU2RSgtKF0qw6x9vzkd6cYUWA1NQamgRkACmeVCp+8NfmcReCFnHWihRjCL9IZcsisZYByBFH9OVDrEcowW49G/o2CLNwEa1os7EJkLbnKo2yaaL8R4cV28FxdaMCOVdLf5QnvLdYPBdDQD8rRJ4OwpSyr6FdqPw6rnwrQRn7uZm5qqNbatrdTTtKraaUrCtZOl0m58n0xNc2qkC1q+KTnSBXXfHC4rq4xI4TI41JrGQYvm2bsLhQRnREdFQsq5GNOhypbiS698Hx5KHVKdaE6dWDq1J4G6QDFr+5MvySN0yq8obFXV1dCW3tGQ6ag8S28qc6ALolPEm3ogOJU+jk4xWlmPU0ikFLWhXWmdZh9nWHJ3lUmqVW9YZBnUXtCPZQGCV5TbkwC/t+y2ZQmPlF2qTM6iN+pRF1j1/XsSXbEZ8HslsBrLLhBKYCmcGlgfh9h+n8tQ8smlyOrZVKe32SlyatqrNVReVejyFFveeE3GjcbvoYC9PcM8QKF642Xj3sqPsui0Uh0WxQDzmHQsVx4dVQr0+/JdJDyqYsmxvQpvZ1B/hBdLT+ZFXX/GdWSV0q5KJdll3seN6jocMquXI8xlFc2CYJWFGRiDwnxCe0veT4AMGgfIpkgfq57hAmI1/jJaB2C3t5Jknqh0iKHDKXrVMl8hraj+hqZRgtlH4s+mdt/Rr8hDtYwL37r2CR0D8tc+VV2T1GEyO5pntIS/yiN7YdY5WZauCXERlsVsaFmEvh8ke0o2txtVV5Y04bqritTd41gZSndbx4wCdDfDs/eNx0dFBQZd/SbVRwV0f1Wu7w/q+/cTRagsn5GRTD1vd2tdMtUD36Q14ZsG3e6mM/B83pDjb0mcJ817IAJbVzgMQAQackMX0d8bK7gePCGPHl4hjxi83iVRRJUQ8dZilEGJIpUl3JOtpSsUc7tOahh9uFlGmH6A8T0J2kH82OEqsecJ4nGvj+Udp6XNowlnjPZb1ydqmt+jLXs83EbSYWVFpdbtdB6paA0aIdao/6j4tWAa1EKAxdsVzHOy/Sk+u80jUdDEkEeaHan7V/w8iZ5hmtkCot3SIzpUFTcOn0a4ixfCqVW4uekdV3l7w1tjg152Gf7ssKLeLxblbzHn34WvpsqDAcf+8qKv/hBkX1zM0m0jgiUEqWU5VTiCCoMguxVzRLBA0KJiII2TAg2LIFuOz9JmI4IFgnL+4tMsjOVs+RkWQnYNia3jHyEsQUjvvODt2hoWQjZpHSGsh5AiHgzOUuGwELIk3whhvSEFp6aF7Kt/2C0lI4SleJR6NRiX/RsUQZbQYUuDRgQLBOktoDonrR8WQZbFGXWwVgfp/Uqic8LMJowINq6doBdRLNEQssTMaEbrIKTfsMplwwdFkCVm2JdmjggWCOpKA4K7IwjJYfEfTyY1IMV/4Alu/gc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
new file mode 100644
index 00000000..317aec90
--- /dev/null
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -0,0 +1,10 @@
+### Job lifecycle
+
+There are several boltdb buckets:
+
+1. `PushBucket` - used for pushed jobs via RPC.
+2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and
+get into the `InQueueBucket` waiting to acknowledgement.
+3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
+
+``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
new file mode 100644
index 00000000..683b26f1
--- /dev/null
+++ b/plugins/boltdb/plugin.go
@@ -0,0 +1,82 @@
+package boltdb
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "boltdb"
+)
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+
+ drivers uint
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.stop = make(chan struct{})
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (p *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (p *Plugin) Stop() error {
+ if p.drivers > 0 {
+ for i := uint(0); i < p.drivers; i++ {
+ // send close signal to every driver
+ p.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+// Name returns plugin name
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Available interface implementation
+func (p *Plugin) Available() {}
+
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ p.drivers++
+
+ return st, nil
+}
+
+// JOBS bbolt implementation
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+}