diff options
Diffstat (limited to 'plugins/boltdb/boltjobs')
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 39 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 430 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 156 |
4 files changed, 0 insertions, 854 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go deleted file mode 100644 index 8cc098c1..00000000 --- a/plugins/boltdb/boltjobs/config.go +++ /dev/null @@ -1,39 +0,0 @@ -package boltjobs - -const ( - file string = "file" - priority string = "priority" - prefetch string = "prefetch" -) - -type GlobalCfg struct { - // db file permissions - Permissions int `mapstructure:"permissions"` - // consume timeout -} - -func (c *GlobalCfg) InitDefaults() { - if c.Permissions == 0 { - c.Permissions = 0777 - } -} - -type Config struct { - File string `mapstructure:"file"` - Priority int `mapstructure:"priority"` - Prefetch int `mapstructure:"prefetch"` -} - -func (c *Config) InitDefaults() { - if c.File == "" { - c.File = "rr.db" - } - - if c.Priority == 0 { - c.Priority = 10 - } - - if c.Prefetch == 0 { - c.Prefetch = 1000 - } -} diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go deleted file mode 100644 index 62045d3b..00000000 --- a/plugins/boltdb/boltjobs/consumer.go +++ /dev/null @@ -1,430 +0,0 @@ -package boltjobs - -import ( - "bytes" - "context" - "encoding/gob" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -const ( - PluginName string = "boltdb" - rrDB string = "rr.db" - - PushBucket string = "push" - InQueueBucket string = "processing" - DelayBucket string = "delayed" -) - -type consumer struct { - file string - permissions int - priority int - prefetch int - - db *bolt.DB - - bPool sync.Pool - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - pipeline atomic.Value - cond *sync.Cond - - listeners uint32 - active *uint64 - delayed *uint64 - - stopCh chan struct{} -} - -func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("init_boltdb_jobs") - - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(PluginName) { - return nil, errors.E(op, errors.Str("no global boltdb configuration")) - } - - conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) - if err != nil { - return nil, errors.E(op, err) - } - - localCfg := &Config{} - err = cfg.UnmarshalKey(configKey, localCfg) - if err != nil { - return nil, errors.E(op, err) - } - - localCfg.InitDefaults() - conf.InitDefaults() - - db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ - Timeout: time.Second * 20, - NoGrowSync: false, - NoFreelistSync: false, - ReadOnly: false, - NoSync: false, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) - if err != nil { - return errors.E(op, upOp) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - cursor := inQb.Cursor() - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - // get all items, which are in the InQueueBucket and put them into the PushBucket - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - err = pushB.Put(k, v) - if err != nil { - return errors.E(op, err) - } - } - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - return &consumer{ - permissions: conf.Permissions, - file: localCfg.File, - priority: localCfg.Priority, - prefetch: localCfg.Prefetch, - - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: utils.Uint64(0), - active: utils.Uint64(0), - - db: db, - log: log, - eh: e, - pq: pq, - stopCh: make(chan struct{}, 2), - }, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("init_boltdb_jobs") - - // if no global section - if !cfg.Has(PluginName) { - return nil, errors.E(op, errors.Str("no global boltdb configuration")) - } - - conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) - if err != nil { - return nil, errors.E(op, err) - } - - // add default values - conf.InitDefaults() - - db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ - Timeout: time.Second * 20, - NoGrowSync: false, - NoFreelistSync: false, - ReadOnly: false, - NoSync: false, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, upOp) - } - - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) - if err != nil { - return errors.E(op, upOp) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - cursor := inQb.Cursor() - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - // get all items, which are in the InQueueBucket and put them into the PushBucket - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - err = pushB.Put(k, v) - if err != nil { - return errors.E(op, err) - } - } - - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - return &consumer{ - file: pipeline.String(file, rrDB), - priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 1000), - permissions: conf.Permissions, - - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: utils.Uint64(0), - active: utils.Uint64(0), - - db: db, - log: log, - eh: e, - pq: pq, - stopCh: make(chan struct{}, 2), - }, nil -} - -func (c *consumer) Push(_ context.Context, job *job.Job) error { - const op = errors.Op("boltdb_jobs_push") - err := c.db.Update(func(tx *bolt.Tx) error { - item := fromJob(job) - // pool with buffers - buf := c.get() - // encode the job - enc := gob.NewEncoder(buf) - err := enc.Encode(item) - if err != nil { - c.put(buf) - return errors.E(op, err) - } - - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - c.put(buf) - - // handle delay - if item.Options.Delay > 0 { - b := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - - err = b.Put(utils.AsBytes(tKey), value) - if err != nil { - return errors.E(op, err) - } - - atomic.AddUint64(c.delayed, 1) - - return nil - } - - b := tx.Bucket(utils.AsBytes(PushBucket)) - err = b.Put(utils.AsBytes(item.ID()), value) - if err != nil { - return errors.E(op, err) - } - - // increment active counter - atomic.AddUint64(c.active, 1) - - return nil - }) - - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("boltdb_run") - start := time.Now() - - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) - } - - // run listener - go c.listener() - go c.delayedJobsListener() - - // increase number of listeners - atomic.AddUint32(&c.listeners, 1) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (c *consumer) Stop(_ context.Context) error { - start := time.Now() - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - c.stopCh <- struct{}{} - } - - pipe := c.pipeline.Load().(*pipeline.Pipeline) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - c.stopCh <- struct{}{} - c.stopCh <- struct{}{} - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 1 { - c.log.Warn("amqp listener already in the active state") - return - } - - // run listener - go c.listener() - go c.delayedJobsListener() - - // increase number of listeners - atomic.AddUint32(&c.listeners, 1) - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: PushBucket, - Active: int64(atomic.LoadUint64(c.active)), - Delayed: int64(atomic.LoadUint64(c.delayed)), - Ready: toBool(atomic.LoadUint32(&c.listeners)), - }, nil -} - -// Private - -func (c *consumer) get() *bytes.Buffer { - return c.bPool.Get().(*bytes.Buffer) -} - -func (c *consumer) put(b *bytes.Buffer) { - b.Reset() - c.bPool.Put(b) -} - -func toBool(r uint32) bool { - return r > 0 -} diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go deleted file mode 100644 index 837f8c63..00000000 --- a/plugins/boltdb/boltjobs/item.go +++ /dev/null @@ -1,229 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" - "go.etcd.io/bbolt" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - db *bbolt.DB - active *uint64 - delayed *uint64 -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - const op = errors.Op("boltdb_item_ack") - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - if i.Options.Delay > 0 { - atomic.AddUint64(i.Options.delayed, ^uint64(0)) - } else { - atomic.AddUint64(i.Options.active, ^uint64(0)) - } - - return tx.Commit() -} - -func (i *Item) Nack() error { - const op = errors.Op("boltdb_item_ack") - /* - steps: - 1. begin tx - 2. get item by ID from the InQueueBucket (previously put in the listener) - 3. put it back to the PushBucket - 4. Delete it from the InQueueBucket - */ - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - v := inQb.Get(utils.AsBytes(i.ID())) - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - err = pushB.Put(utils.AsBytes(i.ID()), v) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - return tx.Commit() -} - -/* -Requeue algorithm: -1. Rewrite item headers and delay. -2. Begin writable transaction on attached to the item db. -3. Delete item from the InQueueBucket -4. Handle items with the delay: - 4.1. Get DelayBucket - 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format - 4.3. Put this key with value to the DelayBucket -5. W/o delay, put the key with value to the PushBucket (requeue) -*/ -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - const op = errors.Op("boltdb_item_requeue") - i.Headers = headers - i.Options.Delay = delay - - tx, err := i.Options.db.Begin(true) - if err != nil { - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - // encode the item - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - val := make([]byte, buf.Len()) - copy(val, buf.Bytes()) - buf.Reset() - - if delay > 0 { - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = delayB.Put(utils.AsBytes(tKey), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() - } - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = pushB.Put(utils.AsBytes(i.ID()), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() -} - -func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { - i.Options.db = db - i.Options.active = active - i.Options.delayed = delayed -} - -func (i *Item) rollback(err error, tx *bbolt.Tx) error { - errR := tx.Rollback() - if errR != nil { - return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) - } - return errors.Errorf("transaction commit error: %v", err) -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go deleted file mode 100644 index 081d3f57..00000000 --- a/plugins/boltdb/boltjobs/listener.go +++ /dev/null @@ -1,156 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -func (c *consumer) listener() { - tt := time.NewTicker(time.Millisecond) - defer tt.Stop() - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - if atomic.LoadUint64(c.active) > uint64(c.prefetch) { - time.Sleep(time.Second) - continue - } - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - b := tx.Bucket(utils.AsBytes(PushBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - // get first item - k, v := b.Cursor().First() - if k == nil && v == nil { - _ = tx.Commit() - continue - } - - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = b.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - } -} - -func (c *consumer) delayedJobsListener() { - tt := time.NewTicker(time.Second) - defer tt.Stop() - - // just some 90's - loc, err := time.LoadLocation("UTC") - if err != nil { - c.log.Error("failed to load location, delayed jobs won't work", "error", err) - return - } - - var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) - - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - cursor := delayB.Cursor() - endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) - - for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = delayB.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - } - } -} - -func (c *consumer) rollback(err error, tx *bolt.Tx) { - errR := tx.Rollback() - if errR != nil { - c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) - } - - c.log.Error("transaction commit error, rollback succeed", "error", err) -} |