diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/boltdb | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/boltdb')
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 39 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 430 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 156 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/config.go | 30 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/driver.go | 472 | ||||
-rw-r--r-- | plugins/boltdb/doc/boltjobs.drawio | 1 | ||||
-rw-r--r-- | plugins/boltdb/doc/job_lifecycle.md | 9 | ||||
-rw-r--r-- | plugins/boltdb/plugin.go | 68 |
9 files changed, 0 insertions, 1434 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go deleted file mode 100644 index 8cc098c1..00000000 --- a/plugins/boltdb/boltjobs/config.go +++ /dev/null @@ -1,39 +0,0 @@ -package boltjobs - -const ( - file string = "file" - priority string = "priority" - prefetch string = "prefetch" -) - -type GlobalCfg struct { - // db file permissions - Permissions int `mapstructure:"permissions"` - // consume timeout -} - -func (c *GlobalCfg) InitDefaults() { - if c.Permissions == 0 { - c.Permissions = 0777 - } -} - -type Config struct { - File string `mapstructure:"file"` - Priority int `mapstructure:"priority"` - Prefetch int `mapstructure:"prefetch"` -} - -func (c *Config) InitDefaults() { - if c.File == "" { - c.File = "rr.db" - } - - if c.Priority == 0 { - c.Priority = 10 - } - - if c.Prefetch == 0 { - c.Prefetch = 1000 - } -} diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go deleted file mode 100644 index 62045d3b..00000000 --- a/plugins/boltdb/boltjobs/consumer.go +++ /dev/null @@ -1,430 +0,0 @@ -package boltjobs - -import ( - "bytes" - "context" - "encoding/gob" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -const ( - PluginName string = "boltdb" - rrDB string = "rr.db" - - PushBucket string = "push" - InQueueBucket string = "processing" - DelayBucket string = "delayed" -) - -type consumer struct { - file string - permissions int - priority int - prefetch int - - db *bolt.DB - - bPool sync.Pool - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - pipeline atomic.Value - cond *sync.Cond - - listeners uint32 - active *uint64 - delayed *uint64 - - stopCh chan struct{} -} - -func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("init_boltdb_jobs") - - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(PluginName) { - return nil, errors.E(op, errors.Str("no global boltdb configuration")) - } - - conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) - if err != nil { - return nil, errors.E(op, err) - } - - localCfg := &Config{} - err = cfg.UnmarshalKey(configKey, localCfg) - if err != nil { - return nil, errors.E(op, err) - } - - localCfg.InitDefaults() - conf.InitDefaults() - - db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ - Timeout: time.Second * 20, - NoGrowSync: false, - NoFreelistSync: false, - ReadOnly: false, - NoSync: false, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) - if err != nil { - return errors.E(op, upOp) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - cursor := inQb.Cursor() - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - // get all items, which are in the InQueueBucket and put them into the PushBucket - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - err = pushB.Put(k, v) - if err != nil { - return errors.E(op, err) - } - } - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - return &consumer{ - permissions: conf.Permissions, - file: localCfg.File, - priority: localCfg.Priority, - prefetch: localCfg.Prefetch, - - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: utils.Uint64(0), - active: utils.Uint64(0), - - db: db, - log: log, - eh: e, - pq: pq, - stopCh: make(chan struct{}, 2), - }, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("init_boltdb_jobs") - - // if no global section - if !cfg.Has(PluginName) { - return nil, errors.E(op, errors.Str("no global boltdb configuration")) - } - - conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) - if err != nil { - return nil, errors.E(op, err) - } - - // add default values - conf.InitDefaults() - - db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ - Timeout: time.Second * 20, - NoGrowSync: false, - NoFreelistSync: false, - ReadOnly: false, - NoSync: false, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) - if err != nil { - return errors.E(op, upOp) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - cursor := inQb.Cursor() - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - // get all items, which are in the InQueueBucket and put them into the PushBucket - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - err = pushB.Put(k, v) - if err != nil { - return errors.E(op, err) - } - } - - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - return &consumer{ - file: pipeline.String(file, rrDB), - priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 1000), - permissions: conf.Permissions, - - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: utils.Uint64(0), - active: utils.Uint64(0), - - db: db, - log: log, - eh: e, - pq: pq, - stopCh: make(chan struct{}, 2), - }, nil -} - -func (c *consumer) Push(_ context.Context, job *job.Job) error { - const op = errors.Op("boltdb_jobs_push") - err := c.db.Update(func(tx *bolt.Tx) error { - item := fromJob(job) - // pool with buffers - buf := c.get() - // encode the job - enc := gob.NewEncoder(buf) - err := enc.Encode(item) - if err != nil { - c.put(buf) - return errors.E(op, err) - } - - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - c.put(buf) - - // handle delay - if item.Options.Delay > 0 { - b := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - - err = b.Put(utils.AsBytes(tKey), value) - if err != nil { - return errors.E(op, err) - } - - atomic.AddUint64(c.delayed, 1) - - return nil - } - - b := tx.Bucket(utils.AsBytes(PushBucket)) - err = b.Put(utils.AsBytes(item.ID()), value) - if err != nil { - return errors.E(op, err) - } - - // increment active counter - atomic.AddUint64(c.active, 1) - - return nil - }) - - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("boltdb_run") - start := time.Now() - - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) - } - - // run listener - go c.listener() - go c.delayedJobsListener() - - // increase number of listeners - atomic.AddUint32(&c.listeners, 1) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (c *consumer) Stop(_ context.Context) error { - start := time.Now() - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - c.stopCh <- struct{}{} - } - - pipe := c.pipeline.Load().(*pipeline.Pipeline) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - c.stopCh <- struct{}{} - c.stopCh <- struct{}{} - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 1 { - c.log.Warn("amqp listener already in the active state") - return - } - - // run listener - go c.listener() - go c.delayedJobsListener() - - // increase number of listeners - atomic.AddUint32(&c.listeners, 1) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: PushBucket, - Active: int64(atomic.LoadUint64(c.active)), - Delayed: int64(atomic.LoadUint64(c.delayed)), - Ready: toBool(atomic.LoadUint32(&c.listeners)), - }, nil -} - -// Private - -func (c *consumer) get() *bytes.Buffer { - return c.bPool.Get().(*bytes.Buffer) -} - -func (c *consumer) put(b *bytes.Buffer) { - b.Reset() - c.bPool.Put(b) -} - -func toBool(r uint32) bool { - return r > 0 -} diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go deleted file mode 100644 index 837f8c63..00000000 --- a/plugins/boltdb/boltjobs/item.go +++ /dev/null @@ -1,229 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" - "go.etcd.io/bbolt" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - db *bbolt.DB - active *uint64 - delayed *uint64 -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - const op = errors.Op("boltdb_item_ack") - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - if i.Options.Delay > 0 { - atomic.AddUint64(i.Options.delayed, ^uint64(0)) - } else { - atomic.AddUint64(i.Options.active, ^uint64(0)) - } - - return tx.Commit() -} - -func (i *Item) Nack() error { - const op = errors.Op("boltdb_item_ack") - /* - steps: - 1. begin tx - 2. get item by ID from the InQueueBucket (previously put in the listener) - 3. put it back to the PushBucket - 4. Delete it from the InQueueBucket - */ - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - v := inQb.Get(utils.AsBytes(i.ID())) - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - err = pushB.Put(utils.AsBytes(i.ID()), v) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - return tx.Commit() -} - -/* -Requeue algorithm: -1. Rewrite item headers and delay. -2. Begin writable transaction on attached to the item db. -3. Delete item from the InQueueBucket -4. Handle items with the delay: - 4.1. Get DelayBucket - 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format - 4.3. Put this key with value to the DelayBucket -5. W/o delay, put the key with value to the PushBucket (requeue) -*/ -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - const op = errors.Op("boltdb_item_requeue") - i.Headers = headers - i.Options.Delay = delay - - tx, err := i.Options.db.Begin(true) - if err != nil { - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - // encode the item - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - val := make([]byte, buf.Len()) - copy(val, buf.Bytes()) - buf.Reset() - - if delay > 0 { - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = delayB.Put(utils.AsBytes(tKey), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() - } - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = pushB.Put(utils.AsBytes(i.ID()), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() -} - -func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { - i.Options.db = db - i.Options.active = active - i.Options.delayed = delayed -} - -func (i *Item) rollback(err error, tx *bbolt.Tx) error { - errR := tx.Rollback() - if errR != nil { - return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) - } - return errors.Errorf("transaction commit error: %v", err) -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go deleted file mode 100644 index 081d3f57..00000000 --- a/plugins/boltdb/boltjobs/listener.go +++ /dev/null @@ -1,156 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -func (c *consumer) listener() { - tt := time.NewTicker(time.Millisecond) - defer tt.Stop() - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - if atomic.LoadUint64(c.active) > uint64(c.prefetch) { - time.Sleep(time.Second) - continue - } - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - b := tx.Bucket(utils.AsBytes(PushBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - // get first item - k, v := b.Cursor().First() - if k == nil && v == nil { - _ = tx.Commit() - continue - } - - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = b.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - } -} - -func (c *consumer) delayedJobsListener() { - tt := time.NewTicker(time.Second) - defer tt.Stop() - - // just some 90's - loc, err := time.LoadLocation("UTC") - if err != nil { - c.log.Error("failed to load location, delayed jobs won't work", "error", err) - return - } - - var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) - - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - cursor := delayB.Cursor() - endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) - - for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = delayB.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - } - } -} - -func (c *consumer) rollback(err error, tx *bolt.Tx) { - errR := tx.Rollback() - if errR != nil { - c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) - } - - c.log.Error("transaction commit error, rollback succeed", "error", err) -} diff --git a/plugins/boltdb/boltkv/config.go b/plugins/boltdb/boltkv/config.go deleted file mode 100644 index 56d00674..00000000 --- a/plugins/boltdb/boltkv/config.go +++ /dev/null @@ -1,30 +0,0 @@ -package boltkv - -type Config struct { - // File is boltDB file. No need to create it by your own, - // boltdb driver is able to create the file, or read existing - File string - // Bucket to store data in boltDB - bucket string - // db file permissions - Permissions int - // timeout - Interval int `mapstructure:"interval"` -} - -// InitDefaults initializes default values for the boltdb -func (s *Config) InitDefaults() { - s.bucket = "default" - - if s.File == "" { - s.File = "rr.db" // default file name - } - - if s.Permissions == 0 { - s.Permissions = 0777 // free for all - } - - if s.Interval == 0 { - s.Interval = 60 // default is 60 seconds timeout - } -} diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go deleted file mode 100644 index 656d572e..00000000 --- a/plugins/boltdb/boltkv/driver.go +++ /dev/null @@ -1,472 +0,0 @@ -package boltkv - -import ( - "bytes" - "encoding/gob" - "os" - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -const ( - RootPluginName string = "kv" -) - -type Driver struct { - clearMu sync.RWMutex - // db instance - DB *bolt.DB - // name should be UTF-8 - bucket []byte - log logger.Logger - cfg *Config - - // gc contains keys with timeouts - gc sync.Map - // default timeout for cache cleanup is 1 minute - timeout time.Duration - - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} -} - -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_boltdb_driver") - - if !cfgPlugin.Has(RootPluginName) { - return nil, errors.E(op, errors.Str("no kv section in the configuration")) - } - - d := &Driver{ - log: log, - stop: make(chan struct{}), - } - - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - // add default values - d.cfg.InitDefaults() - - d.bucket = []byte(d.cfg.bucket) - d.timeout = time.Duration(d.cfg.Interval) * time.Second - d.gc = sync.Map{} - - db, err := bolt.Open(d.cfg.File, os.FileMode(d.cfg.Permissions), &bolt.Options{ - Timeout: time.Second * 20, - NoGrowSync: false, - NoFreelistSync: false, - ReadOnly: false, - NoSync: false, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - d.DB = db - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.bucket)) - if err != nil { - return errors.E(op, upOp) - } - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - go d.startGCLoop() - - return d, nil -} - -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("boltdb_driver_has") - d.log.Debug("boltdb HAS method called", "args", keys) - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - m := make(map[string]bool, len(keys)) - - // this is readable transaction - err := d.DB.View(func(tx *bolt.Tx) error { - // Get retrieves the value for a key in the bucket. - // Returns a nil value if the key does not exist or if the key is a nested bucket. - // The returned value is only valid for the life of the transaction. - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - exist := b.Get([]byte(keys[i])) - if exist != nil { - m[keys[i]] = true - } - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - d.log.Debug("boltdb HAS method finished") - return m, nil -} - -// Get retrieves the value for a key in the bucket. -// Returns a nil value if the key does not exist or if the key is a nested bucket. -// The returned value is only valid for the life of the transaction. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("boltdb_driver_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - var val []byte - err := d.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - val = b.Get([]byte(key)) - - // try to decode values - if val != nil { - buf := bytes.NewReader(val) - decoder := gob.NewDecoder(buf) - - var i string - err := decoder.Decode(&i) - if err != nil { - // unsafe (w/o runes) convert - return errors.E(op, err) - } - - // set the value - val = utils.AsBytes(i) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - return val, nil -} - -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("boltdb_driver_mget") - // defense - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - err := d.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - - buf := new(bytes.Buffer) - var out []byte - buf.Grow(100) - for i := range keys { - value := b.Get([]byte(keys[i])) - buf.Write(value) - // allocate enough space - dec := gob.NewDecoder(buf) - if value != nil { - err := dec.Decode(&out) - if err != nil { - return errors.E(op, err) - } - m[keys[i]] = out - buf.Reset() - out = nil - } - } - - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - return m, nil -} - -// Set puts the K/V to the bolt -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("boltdb_driver_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - // start writable transaction - tx, err := d.DB.Begin(true) - if err != nil { - return errors.E(op, err) - } - defer func() { - err = tx.Commit() - if err != nil { - errRb := tx.Rollback() - if errRb != nil { - d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) - } - } - }() - - b := tx.Bucket(d.bucket) - // use access by index to avoid copying - for i := range items { - // performance note: pass a prepared bytes slice with initial cap - // we can't move buf and gob out of loop, because we need to clear both from data - // but gob will contain (w/o re-init) the past data - buf := new(bytes.Buffer) - encoder := gob.NewEncoder(buf) - if errors.Is(errors.EmptyItem, err) { - return errors.E(op, errors.EmptyItem) - } - - // Encode value - err = encoder.Encode(&items[i].Value) - if err != nil { - return errors.E(op, err) - } - // buf.Bytes will copy the underlying slice. Take a look in case of performance problems - err = b.Put([]byte(items[i].Key), buf.Bytes()) - if err != nil { - return errors.E(op, err) - } - - // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check - // we do not need mutex here, since we use sync.Map - if items[i].Timeout != "" { - // check correctness of provided TTL - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - // Store key TTL in the separate map - d.gc.Store(items[i].Key, items[i].Timeout) - } - - buf.Reset() - } - - return nil -} - -// Delete all keys from DB -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("boltdb_driver_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - // start writable transaction - tx, err := d.DB.Begin(true) - if err != nil { - return errors.E(op, err) - } - - defer func() { - err = tx.Commit() - if err != nil { - errRb := tx.Rollback() - if errRb != nil { - d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) - } - } - }() - - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - - for _, key := range keys { - err = b.Delete([]byte(key)) - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("boltdb_driver_mexpire") - for i := range items { - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - - d.gc.Store(items[i].Key, items[i].Timeout) - } - return nil -} - -func (d *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("boltdb_driver_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for i := range keys { - if item, ok := d.gc.Load(keys[i]); ok { - // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 - m[keys[i]] = item.(string) - } - } - return m, nil -} - -func (d *Driver) Clear() error { - err := d.DB.Update(func(tx *bolt.Tx) error { - err := tx.DeleteBucket(d.bucket) - if err != nil { - d.log.Error("boltdb delete bucket", "error", err) - return err - } - - _, err = tx.CreateBucket(d.bucket) - if err != nil { - d.log.Error("boltdb create bucket", "error", err) - return err - } - - return nil - }) - - if err != nil { - d.log.Error("clear transaction failed", "error", err) - return err - } - - d.clearMu.Lock() - d.gc = sync.Map{} - d.clearMu.Unlock() - - return nil -} - -func (d *Driver) Stop() { - d.stop <- struct{}{} -} - -// ========================= PRIVATE ================================= - -func (d *Driver) startGCLoop() { //nolint:gocognit - go func() { - t := time.NewTicker(d.timeout) - defer t.Stop() - for { - select { - case <-t.C: - d.clearMu.RLock() - - // calculate current time before loop started to be fair - now := time.Now() - d.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } - - if now.After(v) { - // time expired - d.gc.Delete(k) - d.log.Debug("key deleted", "key", k) - err := d.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete(utils.AsBytes(k)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - d.log.Error("error during the gc phase of update", "error", err) - return false - } - } - return true - }) - - d.clearMu.RUnlock() - case <-d.stop: - err := d.DB.Close() - if err != nil { - d.log.Error("error") - } - return - } - } - }() -} diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio deleted file mode 100644 index 7d1f3531..00000000 --- a/plugins/boltdb/doc/boltjobs.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-08-31T09:34:11.357Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="KiNZAPNeIcd5kV3EE5lF" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">7V1bc5s4GP01nmkfkgHE9TF2km13up20me1uH2WQbRJsuYATe3/9ijtICiHmIrvBnWmNEAjrfNejT3QCZuv9Hz7crv7CDvImiuTsJ+B6oigKABr5J2o5JC0yUJWkZem7TtpWNNy7/6G0UUpbd66DgkrHEGMvdLfVRhtvNsgOK23Q9/FztdsCe9VRt3CJmIZ7G3ps6z+uE67SVl2SihOfkLtcZUMr2Zk1zHqnDcEKOvi51ARuJmDmYxwm39b7GfKi6csmJrnu9oWz+ZP5aBM2ucDcf0ZP8pX6w3iw/K8/fijw1/VFikYQHrJfjBwyAekh9sMVXuIN9G6K1qmPdxsHRXeVyFHR5wvGW9Iok8YHFIaHFE24CzFpWoVrLz1LHtg//Btdf6llhz/T28UH1/vK0SE9WuBNeAvXrhc1zGBgQ8eF5OlnRO7S0+mYMiDHyU+Lfs+LM5aJFPSXKKyZJjnHi4g6wmtEHotc5yMPhu5T9f4wFbll3i+/9A67ZGRFSvVDV1PROORiVb1F8lzpVWVoX7uRSt0owDvfRsyNyJfS7ymaYsl5gxRl2jqwGO3dsCRF5Ohn6UwhQ9HBoSxQpyR6CTKviV4DEVVbimhFJt4sAMl9n6C3S0e6+/v+E2n5fjdjRKMK/PPKDdH9FsaT8EzcSBXkN078wvW8GfawHw8F5tA2nag9CH38iEpnFKCqmpND9YT8EO3rwWInN71AM6vqp2R6/FxyGZkbWJW8Ba2mneFhiNRHuaSNhW7y9VGcXoGGeqWJ1CvzXHAs7GrFqhZG9uTsalP8hdpVa8RfNP66SPwB41enJPm5nrb0qVUnuSAf0+Q5SaADCzjHwtTKp+pVl6qZGutSZY5LBX25VFlmJv1EdVGcTrW1lfzsxqDCK7VhmtRVdqOywe0uWJGW6c5+RGG3uqhFf3i6qMefFL+yjsYfITqqaBQwoGHcSwPYmZIqQh1mNRGVapX0pUQ050dqHGZNwmM7aG7OubacfG5vX5eTDs2Bfg4hlgxOR2Qachfy5ExiLFk5hyBL1hgDP8PbaPwHPOcKxxc4R14VUOi5yw35bpNJQkTlppFldW3oXaUn1q7jJLKDAvc/OI/vF8GzjfxW/Ju06US75ul4E43lWPKUFE/HmuRMdBnKGp140e4TkbOUdNJa+vYLWbIqPuRCoXwDXiwC1A9rqYvVfKO56tMS0RONKUL1ZaOXoJEkB1QGMXDUmE0AGzZ6bhCiDTESivRBltbBx24jyAXSbZsXATiGNZeOdgitIkUmhOdFiuqQkaLMRvXyZTwHheWXiO6Sv1aI/P15822HdigL+XWPPOB07ldw03/tosW8eAIvgngGr0gHGWz38TRm58m3ZfSvEg14jTwURiMsfLzOh4tE5YX0gqAQ8n2PhxZhO88TmasghZ4RoQ3eIDHSQ5EBwGSlJ5eUQfh1WSgx+5ag8Qw9h9HUc1jDeA59aM9hsJ7j010k7th/RH4QzSvGXkuvQckFE79SbgXJjoYMnluxdANAvamItDIEskQtdGuG6JU2RSgtKF0qw6x9vzkd6cYUWA1NQamgRkACmeVCp+8NfmcReCFnHWihRjCL9IZcsisZYByBFH9OVDrEcowW49G/o2CLNwEa1os7EJkLbnKo2yaaL8R4cV28FxdaMCOVdLf5QnvLdYPBdDQD8rRJ4OwpSyr6FdqPw6rnwrQRn7uZm5qqNbatrdTTtKraaUrCtZOl0m58n0xNc2qkC1q+KTnSBXXfHC4rq4xI4TI41JrGQYvm2bsLhQRnREdFQsq5GNOhypbiS698Hx5KHVKdaE6dWDq1J4G6QDFr+5MvySN0yq8obFXV1dCW3tGQ6ag8S28qc6ALolPEm3ogOJU+jk4xWlmPU0ikFLWhXWmdZh9nWHJ3lUmqVW9YZBnUXtCPZQGCV5TbkwC/t+y2ZQmPlF2qTM6iN+pRF1j1/XsSXbEZ8HslsBrLLhBKYCmcGlgfh9h+n8tQ8smlyOrZVKe32SlyatqrNVReVejyFFveeE3GjcbvoYC9PcM8QKF642Xj3sqPsui0Uh0WxQDzmHQsVx4dVQr0+/JdJDyqYsmxvQpvZ1B/hBdLT+ZFXX/GdWSV0q5KJdll3seN6jocMquXI8xlFc2CYJWFGRiDwnxCe0veT4AMGgfIpkgfq57hAmI1/jJaB2C3t5Jknqh0iKHDKXrVMl8hraj+hqZRgtlH4s+mdt/Rr8hDtYwL37r2CR0D8tc+VV2T1GEyO5pntIS/yiN7YdY5WZauCXERlsVsaFmEvh8ke0o2txtVV5Y04bqritTd41gZSndbx4wCdDfDs/eNx0dFBQZd/SbVRwV0f1Wu7w/q+/cTRagsn5GRTD1vd2tdMtUD36Q14ZsG3e6mM/B83pDjb0mcJ817IAJbVzgMQAQackMX0d8bK7gePCGPHl4hjxi83iVRRJUQ8dZilEGJIpUl3JOtpSsUc7tOahh9uFlGmH6A8T0J2kH82OEqsecJ4nGvj+Udp6XNowlnjPZb1ydqmt+jLXs83EbSYWVFpdbtdB6paA0aIdao/6j4tWAa1EKAxdsVzHOy/Sk+u80jUdDEkEeaHan7V/w8iZ5hmtkCot3SIzpUFTcOn0a4ixfCqVW4uekdV3l7w1tjg152Gf7ssKLeLxblbzHn34WvpsqDAcf+8qKv/hBkX1zM0m0jgiUEqWU5VTiCCoMguxVzRLBA0KJiII2TAg2LIFuOz9JmI4IFgnL+4tMsjOVs+RkWQnYNia3jHyEsQUjvvODt2hoWQjZpHSGsh5AiHgzOUuGwELIk3whhvSEFp6aF7Kt/2C0lI4SleJR6NRiX/RsUQZbQYUuDRgQLBOktoDonrR8WQZbFGXWwVgfp/Uqic8LMJowINq6doBdRLNEQssTMaEbrIKTfsMplwwdFkCVm2JdmjggWCOpKA4K7IwjJYfEfTyY1IMV/4Alu/gc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md deleted file mode 100644 index 1424e586..00000000 --- a/plugins/boltdb/doc/job_lifecycle.md +++ /dev/null @@ -1,9 +0,0 @@ -### Job lifecycle - -There are several boltdb buckets: - -1. `PushBucket` - used for pushed jobs via RPC. -2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and -get into the `InQueueBucket` waiting to acknowledgement. -3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. - diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go deleted file mode 100644 index ad98cf3c..00000000 --- a/plugins/boltdb/plugin.go +++ /dev/null @@ -1,68 +0,0 @@ -package boltdb - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs" - "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "boltdb" -) - -// Plugin BoltDB K/V storage. -type Plugin struct { - cfg config.Configurer - // logger - log logger.Logger -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -// Serve is noop here -func (p *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (p *Plugin) Stop() error { - return nil -} - -// Name returns plugin name -func (p *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (p *Plugin) Available() {} - -func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - return st, nil -} - -// JOBS bbolt implementation - -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) -} - -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) -} |