diff options
Diffstat (limited to 'plugins/boltdb')
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 39 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 422 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 151 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/config.go | 30 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/driver.go | 468 | ||||
-rw-r--r-- | plugins/boltdb/doc/boltjobs.drawio | 1 | ||||
-rw-r--r-- | plugins/boltdb/doc/job_lifecycle.md | 10 | ||||
-rw-r--r-- | plugins/boltdb/plugin.go | 82 |
9 files changed, 1432 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go new file mode 100644 index 00000000..8cc098c1 --- /dev/null +++ b/plugins/boltdb/boltjobs/config.go @@ -0,0 +1,39 @@ +package boltjobs + +const ( + file string = "file" + priority string = "priority" + prefetch string = "prefetch" +) + +type GlobalCfg struct { + // db file permissions + Permissions int `mapstructure:"permissions"` + // consume timeout +} + +func (c *GlobalCfg) InitDefaults() { + if c.Permissions == 0 { + c.Permissions = 0777 + } +} + +type Config struct { + File string `mapstructure:"file"` + Priority int `mapstructure:"priority"` + Prefetch int `mapstructure:"prefetch"` +} + +func (c *Config) InitDefaults() { + if c.File == "" { + c.File = "rr.db" + } + + if c.Priority == 0 { + c.Priority = 10 + } + + if c.Prefetch == 0 { + c.Prefetch = 1000 + } +} diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go new file mode 100644 index 00000000..ed0eda61 --- /dev/null +++ b/plugins/boltdb/boltjobs/consumer.go @@ -0,0 +1,422 @@ +package boltjobs + +import ( + "bytes" + "context" + "encoding/gob" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + PluginName string = "boltdb" + rrDB string = "rr.db" + + PushBucket string = "push" + InQueueBucket string = "processing" + DelayBucket string = "delayed" +) + +type consumer struct { + file string + permissions int + priority int + prefetch int + + db *bolt.DB + + bPool sync.Pool + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipeline atomic.Value + cond *sync.Cond + + listeners uint32 + active *uint64 + delayed *uint64 + + stopCh chan struct{} +} + +func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg := &Config{} + err = cfg.UnmarshalKey(configKey, localCfg) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg.InitDefaults() + conf.InitDefaults() + + db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + permissions: conf.Permissions, + file: localCfg.File, + priority: localCfg.Priority, + prefetch: localCfg.Prefetch, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + + db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + file: pipeline.String(file, rrDB), + priority: pipeline.Int(priority, 10), + prefetch: pipeline.Int(prefetch, 100), + permissions: conf.Permissions, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func (c *consumer) Push(_ context.Context, job *job.Job) error { + const op = errors.Op("boltdb_jobs_push") + err := c.db.Update(func(tx *bolt.Tx) error { + item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) + + // handle delay + if item.Options.Delay > 0 { + b := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) + + err = b.Put(utils.AsBytes(tKey), value) + if err != nil { + return errors.E(op, err) + } + + atomic.AddUint64(c.delayed, 1) + + return nil + } + + b := tx.Bucket(utils.AsBytes(PushBucket)) + err = b.Put(utils.AsBytes(item.ID()), value) + if err != nil { + return errors.E(op, err) + } + + // increment active counter + atomic.AddUint64(c.active, 1) + + return nil + }) + + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("boltdb_run") + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + } + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 1 { + c.log.Warn("amqp listener already in the active state") + return + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: PushBucket, + Active: int64(atomic.LoadUint64(c.active)), + Delayed: int64(atomic.LoadUint64(c.delayed)), + Ready: toBool(atomic.LoadUint32(&c.listeners)), + }, nil +} + +// Private + +func (c *consumer) get() *bytes.Buffer { + return c.bPool.Get().(*bytes.Buffer) +} + +func (c *consumer) put(b *bytes.Buffer) { + b.Reset() + c.bPool.Put(b) +} + +func toBool(r uint32) bool { + return r > 0 +} diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go new file mode 100644 index 00000000..837f8c63 --- /dev/null +++ b/plugins/boltdb/boltjobs/item.go @@ -0,0 +1,229 @@ +package boltjobs + +import ( + "bytes" + "encoding/gob" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" + "go.etcd.io/bbolt" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + db *bbolt.DB + active *uint64 + delayed *uint64 +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + const op = errors.Op("boltdb_item_ack") + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + if i.Options.Delay > 0 { + atomic.AddUint64(i.Options.delayed, ^uint64(0)) + } else { + atomic.AddUint64(i.Options.active, ^uint64(0)) + } + + return tx.Commit() +} + +func (i *Item) Nack() error { + const op = errors.Op("boltdb_item_ack") + /* + steps: + 1. begin tx + 2. get item by ID from the InQueueBucket (previously put in the listener) + 3. put it back to the PushBucket + 4. Delete it from the InQueueBucket + */ + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + v := inQb.Get(utils.AsBytes(i.ID())) + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + err = pushB.Put(utils.AsBytes(i.ID()), v) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + return tx.Commit() +} + +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + const op = errors.Op("boltdb_item_requeue") + i.Headers = headers + i.Options.Delay = delay + + tx, err := i.Options.db.Begin(true) + if err != nil { + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + + if delay > 0 { + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) + + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = delayB.Put(utils.AsBytes(tKey), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() + } + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = pushB.Put(utils.AsBytes(i.ID()), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() +} + +func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { + i.Options.db = db + i.Options.active = active + i.Options.delayed = delayed +} + +func (i *Item) rollback(err error, tx *bbolt.Tx) error { + errR := tx.Rollback() + if errR != nil { + return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) + } + return errors.Errorf("transaction commit error: %v", err) +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go new file mode 100644 index 00000000..7c161555 --- /dev/null +++ b/plugins/boltdb/boltjobs/listener.go @@ -0,0 +1,151 @@ +package boltjobs + +import ( + "bytes" + "encoding/gob" + "time" + + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +func (c *consumer) listener() { + tt := time.NewTicker(time.Millisecond) + defer tt.Stop() + for { + select { + case <-c.stopCh: + c.log.Info("boltdb listener stopped") + return + case <-tt.C: + tx, err := c.db.Begin(true) + if err != nil { + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue + } + + b := tx.Bucket(utils.AsBytes(PushBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + // get first item + k, v := b.Cursor().First() + if k == nil && v == nil { + _ = tx.Commit() + continue + } + + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = b.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) + } + } +} + +func (c *consumer) delayedJobsListener() { + tt := time.NewTicker(time.Second) + defer tt.Stop() + + // just some 90's + loc, err := time.LoadLocation("UTC") + if err != nil { + c.log.Error("failed to load location, delayed jobs won't work", "error", err) + return + } + + var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) + + for { + select { + case <-c.stopCh: + c.log.Info("boltdb listener stopped") + return + case <-tt.C: + tx, err := c.db.Begin(true) + if err != nil { + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue + } + + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + cursor := delayB.Cursor() + endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) + + for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = delayB.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) + } + + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + } + } +} + +func (c *consumer) rollback(err error, tx *bolt.Tx) { + errR := tx.Rollback() + if errR != nil { + c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) + } + + c.log.Error("transaction commit error, rollback succeed", "error", err) +} diff --git a/plugins/boltdb/boltkv/config.go b/plugins/boltdb/boltkv/config.go new file mode 100644 index 00000000..56d00674 --- /dev/null +++ b/plugins/boltdb/boltkv/config.go @@ -0,0 +1,30 @@ +package boltkv + +type Config struct { + // File is boltDB file. No need to create it by your own, + // boltdb driver is able to create the file, or read existing + File string + // Bucket to store data in boltDB + bucket string + // db file permissions + Permissions int + // timeout + Interval int `mapstructure:"interval"` +} + +// InitDefaults initializes default values for the boltdb +func (s *Config) InitDefaults() { + s.bucket = "default" + + if s.File == "" { + s.File = "rr.db" // default file name + } + + if s.Permissions == 0 { + s.Permissions = 0777 // free for all + } + + if s.Interval == 0 { + s.Interval = 60 // default is 60 seconds timeout + } +} diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go new file mode 100644 index 00000000..ba1450cd --- /dev/null +++ b/plugins/boltdb/boltkv/driver.go @@ -0,0 +1,468 @@ +package boltkv + +import ( + "bytes" + "encoding/gob" + "os" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + RootPluginName string = "kv" +) + +type Driver struct { + clearMu sync.RWMutex + // db instance + DB *bolt.DB + // name should be UTF-8 + bucket []byte + log logger.Logger + cfg *Config + + // gc contains keys with timeouts + gc sync.Map + // default timeout for cache cleanup is 1 minute + timeout time.Duration + + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} +} + +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { + const op = errors.Op("new_boltdb_driver") + + if !cfgPlugin.Has(RootPluginName) { + return nil, errors.E(op, errors.Str("no kv section in the configuration")) + } + + d := &Driver{ + log: log, + stop: stop, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + d.cfg.InitDefaults() + + d.bucket = []byte(d.cfg.bucket) + d.timeout = time.Duration(d.cfg.Interval) * time.Second + d.gc = sync.Map{} + + db, err := bolt.Open(d.cfg.File, os.FileMode(d.cfg.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + d.DB = db + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.bucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + go d.startGCLoop() + + return d, nil +} + +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("boltdb_driver_has") + d.log.Debug("boltdb HAS method called", "args", keys) + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + m := make(map[string]bool, len(keys)) + + // this is readable transaction + err := d.DB.View(func(tx *bolt.Tx) error { + // Get retrieves the value for a key in the bucket. + // Returns a nil value if the key does not exist or if the key is a nested bucket. + // The returned value is only valid for the life of the transaction. + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + exist := b.Get([]byte(keys[i])) + if exist != nil { + m[keys[i]] = true + } + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + d.log.Debug("boltdb HAS method finished") + return m, nil +} + +// Get retrieves the value for a key in the bucket. +// Returns a nil value if the key does not exist or if the key is a nested bucket. +// The returned value is only valid for the life of the transaction. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("boltdb_driver_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + var val []byte + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + val = b.Get([]byte(key)) + + // try to decode values + if val != nil { + buf := bytes.NewReader(val) + decoder := gob.NewDecoder(buf) + + var i string + err := decoder.Decode(&i) + if err != nil { + // unsafe (w/o runes) convert + return errors.E(op, err) + } + + // set the value + val = utils.AsBytes(i) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return val, nil +} + +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("boltdb_driver_mget") + // defense + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + buf := new(bytes.Buffer) + var out []byte + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) + if value != nil { + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = nil + } + } + + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return m, nil +} + +// Set puts the K/V to the bolt +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("boltdb_driver_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + // start writable transaction + tx, err := d.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(d.bucket) + // use access by index to avoid copying + for i := range items { + // performance note: pass a prepared bytes slice with initial cap + // we can't move buf and gob out of loop, because we need to clear both from data + // but gob will contain (w/o re-init) the past data + buf := new(bytes.Buffer) + encoder := gob.NewEncoder(buf) + if errors.Is(errors.EmptyItem, err) { + return errors.E(op, errors.EmptyItem) + } + + // Encode value + err = encoder.Encode(&items[i].Value) + if err != nil { + return errors.E(op, err) + } + // buf.Bytes will copy the underlying slice. Take a look in case of performance problems + err = b.Put([]byte(items[i].Key), buf.Bytes()) + if err != nil { + return errors.E(op, err) + } + + // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check + // we do not need mutex here, since we use sync.Map + if items[i].Timeout != "" { + // check correctness of provided TTL + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + // Store key TTL in the separate map + d.gc.Store(items[i].Key, items[i].Timeout) + } + + buf.Reset() + } + + return nil +} + +// Delete all keys from DB +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("boltdb_driver_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + // start writable transaction + tx, err := d.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + for _, key := range keys { + err = b.Delete([]byte(key)) + if err != nil { + return errors.E(op, err) + } + } + + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("boltdb_driver_mexpire") + for i := range items { + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + + d.gc.Store(items[i].Key, items[i].Timeout) + } + return nil +} + +func (d *Driver) TTL(keys ...string) (map[string]string, error) { + const op = errors.Op("boltdb_driver_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]string, len(keys)) + + for i := range keys { + if item, ok := d.gc.Load(keys[i]); ok { + // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 + m[keys[i]] = item.(string) + } + } + return m, nil +} + +func (d *Driver) Clear() error { + err := d.DB.Update(func(tx *bolt.Tx) error { + err := tx.DeleteBucket(d.bucket) + if err != nil { + d.log.Error("boltdb delete bucket", "error", err) + return err + } + + _, err = tx.CreateBucket(d.bucket) + if err != nil { + d.log.Error("boltdb create bucket", "error", err) + return err + } + + return nil + }) + + if err != nil { + d.log.Error("clear transaction failed", "error", err) + return err + } + + d.clearMu.Lock() + d.gc = sync.Map{} + d.clearMu.Unlock() + + return nil +} + +// ========================= PRIVATE ================================= + +func (d *Driver) startGCLoop() { //nolint:gocognit + go func() { + t := time.NewTicker(d.timeout) + defer t.Stop() + for { + select { + case <-t.C: + d.clearMu.RLock() + + // calculate current time before loop started to be fair + now := time.Now() + d.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } + + if now.After(v) { + // time expired + d.gc.Delete(k) + d.log.Debug("key deleted", "key", k) + err := d.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete(utils.AsBytes(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + d.log.Error("error during the gc phase of update", "error", err) + return false + } + } + return true + }) + + d.clearMu.RUnlock() + case <-d.stop: + err := d.DB.Close() + if err != nil { + d.log.Error("error") + } + return + } + } + }() +} diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio new file mode 100644 index 00000000..7d1f3531 --- /dev/null +++ b/plugins/boltdb/doc/boltjobs.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-08-31T09:34:11.357Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="KiNZAPNeIcd5kV3EE5lF" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">7V1bc5s4GP01nmkfkgHE9TF2km13up20me1uH2WQbRJsuYATe3/9ijtICiHmIrvBnWmNEAjrfNejT3QCZuv9Hz7crv7CDvImiuTsJ+B6oigKABr5J2o5JC0yUJWkZem7TtpWNNy7/6G0UUpbd66DgkrHEGMvdLfVRhtvNsgOK23Q9/FztdsCe9VRt3CJmIZ7G3ps6z+uE67SVl2SihOfkLtcZUMr2Zk1zHqnDcEKOvi51ARuJmDmYxwm39b7GfKi6csmJrnu9oWz+ZP5aBM2ucDcf0ZP8pX6w3iw/K8/fijw1/VFikYQHrJfjBwyAekh9sMVXuIN9G6K1qmPdxsHRXeVyFHR5wvGW9Iok8YHFIaHFE24CzFpWoVrLz1LHtg//Btdf6llhz/T28UH1/vK0SE9WuBNeAvXrhc1zGBgQ8eF5OlnRO7S0+mYMiDHyU+Lfs+LM5aJFPSXKKyZJjnHi4g6wmtEHotc5yMPhu5T9f4wFbll3i+/9A67ZGRFSvVDV1PROORiVb1F8lzpVWVoX7uRSt0owDvfRsyNyJfS7ymaYsl5gxRl2jqwGO3dsCRF5Ohn6UwhQ9HBoSxQpyR6CTKviV4DEVVbimhFJt4sAMl9n6C3S0e6+/v+E2n5fjdjRKMK/PPKDdH9FsaT8EzcSBXkN078wvW8GfawHw8F5tA2nag9CH38iEpnFKCqmpND9YT8EO3rwWInN71AM6vqp2R6/FxyGZkbWJW8Ba2mneFhiNRHuaSNhW7y9VGcXoGGeqWJ1CvzXHAs7GrFqhZG9uTsalP8hdpVa8RfNP66SPwB41enJPm5nrb0qVUnuSAf0+Q5SaADCzjHwtTKp+pVl6qZGutSZY5LBX25VFlmJv1EdVGcTrW1lfzsxqDCK7VhmtRVdqOywe0uWJGW6c5+RGG3uqhFf3i6qMefFL+yjsYfITqqaBQwoGHcSwPYmZIqQh1mNRGVapX0pUQ050dqHGZNwmM7aG7OubacfG5vX5eTDs2Bfg4hlgxOR2Qachfy5ExiLFk5hyBL1hgDP8PbaPwHPOcKxxc4R14VUOi5yw35bpNJQkTlppFldW3oXaUn1q7jJLKDAvc/OI/vF8GzjfxW/Ju06US75ul4E43lWPKUFE/HmuRMdBnKGp140e4TkbOUdNJa+vYLWbIqPuRCoXwDXiwC1A9rqYvVfKO56tMS0RONKUL1ZaOXoJEkB1QGMXDUmE0AGzZ6bhCiDTESivRBltbBx24jyAXSbZsXATiGNZeOdgitIkUmhOdFiuqQkaLMRvXyZTwHheWXiO6Sv1aI/P15822HdigL+XWPPOB07ldw03/tosW8eAIvgngGr0gHGWz38TRm58m3ZfSvEg14jTwURiMsfLzOh4tE5YX0gqAQ8n2PhxZhO88TmasghZ4RoQ3eIDHSQ5EBwGSlJ5eUQfh1WSgx+5ag8Qw9h9HUc1jDeA59aM9hsJ7j010k7th/RH4QzSvGXkuvQckFE79SbgXJjoYMnluxdANAvamItDIEskQtdGuG6JU2RSgtKF0qw6x9vzkd6cYUWA1NQamgRkACmeVCp+8NfmcReCFnHWihRjCL9IZcsisZYByBFH9OVDrEcowW49G/o2CLNwEa1os7EJkLbnKo2yaaL8R4cV28FxdaMCOVdLf5QnvLdYPBdDQD8rRJ4OwpSyr6FdqPw6rnwrQRn7uZm5qqNbatrdTTtKraaUrCtZOl0m58n0xNc2qkC1q+KTnSBXXfHC4rq4xI4TI41JrGQYvm2bsLhQRnREdFQsq5GNOhypbiS698Hx5KHVKdaE6dWDq1J4G6QDFr+5MvySN0yq8obFXV1dCW3tGQ6ag8S28qc6ALolPEm3ogOJU+jk4xWlmPU0ikFLWhXWmdZh9nWHJ3lUmqVW9YZBnUXtCPZQGCV5TbkwC/t+y2ZQmPlF2qTM6iN+pRF1j1/XsSXbEZ8HslsBrLLhBKYCmcGlgfh9h+n8tQ8smlyOrZVKe32SlyatqrNVReVejyFFveeE3GjcbvoYC9PcM8QKF642Xj3sqPsui0Uh0WxQDzmHQsVx4dVQr0+/JdJDyqYsmxvQpvZ1B/hBdLT+ZFXX/GdWSV0q5KJdll3seN6jocMquXI8xlFc2CYJWFGRiDwnxCe0veT4AMGgfIpkgfq57hAmI1/jJaB2C3t5Jknqh0iKHDKXrVMl8hraj+hqZRgtlH4s+mdt/Rr8hDtYwL37r2CR0D8tc+VV2T1GEyO5pntIS/yiN7YdY5WZauCXERlsVsaFmEvh8ke0o2txtVV5Y04bqritTd41gZSndbx4wCdDfDs/eNx0dFBQZd/SbVRwV0f1Wu7w/q+/cTRagsn5GRTD1vd2tdMtUD36Q14ZsG3e6mM/B83pDjb0mcJ817IAJbVzgMQAQackMX0d8bK7gePCGPHl4hjxi83iVRRJUQ8dZilEGJIpUl3JOtpSsUc7tOahh9uFlGmH6A8T0J2kH82OEqsecJ4nGvj+Udp6XNowlnjPZb1ydqmt+jLXs83EbSYWVFpdbtdB6paA0aIdao/6j4tWAa1EKAxdsVzHOy/Sk+u80jUdDEkEeaHan7V/w8iZ5hmtkCot3SIzpUFTcOn0a4ixfCqVW4uekdV3l7w1tjg152Gf7ssKLeLxblbzHn34WvpsqDAcf+8qKv/hBkX1zM0m0jgiUEqWU5VTiCCoMguxVzRLBA0KJiII2TAg2LIFuOz9JmI4IFgnL+4tMsjOVs+RkWQnYNia3jHyEsQUjvvODt2hoWQjZpHSGsh5AiHgzOUuGwELIk3whhvSEFp6aF7Kt/2C0lI4SleJR6NRiX/RsUQZbQYUuDRgQLBOktoDonrR8WQZbFGXWwVgfp/Uqic8LMJowINq6doBdRLNEQssTMaEbrIKTfsMplwwdFkCVm2JdmjggWCOpKA4K7IwjJYfEfTyY1IMV/4Alu/gc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md new file mode 100644 index 00000000..317aec90 --- /dev/null +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -0,0 +1,10 @@ +### Job lifecycle + +There are several boltdb buckets: + +1. `PushBucket` - used for pushed jobs via RPC. +2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and +get into the `InQueueBucket` waiting to acknowledgement. +3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. + +`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go new file mode 100644 index 00000000..683b26f1 --- /dev/null +++ b/plugins/boltdb/plugin.go @@ -0,0 +1,82 @@ +package boltdb + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "boltdb" +) + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + + drivers uint +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.stop = make(chan struct{}) + p.log = log + p.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (p *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (p *Plugin) Stop() error { + if p.drivers > 0 { + for i := uint(0); i < p.drivers; i++ { + // send close signal to every driver + p.stop <- struct{}{} + } + } + return nil +} + +// Name returns plugin name +func (p *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (p *Plugin) Available() {} + +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + if err != nil { + return nil, errors.E(op, err) + } + + // save driver number to release resources after Stop + p.drivers++ + + return st, nil +} + +// JOBS bbolt implementation + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) +} |