diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 422 |
1 files changed, 422 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go new file mode 100644 index 00000000..ed0eda61 --- /dev/null +++ b/plugins/boltdb/boltjobs/consumer.go @@ -0,0 +1,422 @@ +package boltjobs + +import ( + "bytes" + "context" + "encoding/gob" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + PluginName string = "boltdb" + rrDB string = "rr.db" + + PushBucket string = "push" + InQueueBucket string = "processing" + DelayBucket string = "delayed" +) + +type consumer struct { + file string + permissions int + priority int + prefetch int + + db *bolt.DB + + bPool sync.Pool + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipeline atomic.Value + cond *sync.Cond + + listeners uint32 + active *uint64 + delayed *uint64 + + stopCh chan struct{} +} + +func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg := &Config{} + err = cfg.UnmarshalKey(configKey, localCfg) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg.InitDefaults() + conf.InitDefaults() + + db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + permissions: conf.Permissions, + file: localCfg.File, + priority: localCfg.Priority, + prefetch: localCfg.Prefetch, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + + db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + file: pipeline.String(file, rrDB), + priority: pipeline.Int(priority, 10), + prefetch: pipeline.Int(prefetch, 100), + permissions: conf.Permissions, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func (c *consumer) Push(_ context.Context, job *job.Job) error { + const op = errors.Op("boltdb_jobs_push") + err := c.db.Update(func(tx *bolt.Tx) error { + item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) + + // handle delay + if item.Options.Delay > 0 { + b := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) + + err = b.Put(utils.AsBytes(tKey), value) + if err != nil { + return errors.E(op, err) + } + + atomic.AddUint64(c.delayed, 1) + + return nil + } + + b := tx.Bucket(utils.AsBytes(PushBucket)) + err = b.Put(utils.AsBytes(item.ID()), value) + if err != nil { + return errors.E(op, err) + } + + // increment active counter + atomic.AddUint64(c.active, 1) + + return nil + }) + + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("boltdb_run") + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + } + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 1 { + c.log.Warn("amqp listener already in the active state") + return + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: PushBucket, + Active: int64(atomic.LoadUint64(c.active)), + Delayed: int64(atomic.LoadUint64(c.delayed)), + Ready: toBool(atomic.LoadUint32(&c.listeners)), + }, nil +} + +// Private + +func (c *consumer) get() *bytes.Buffer { + return c.bPool.Get().(*bytes.Buffer) +} + +func (c *consumer) put(b *bytes.Buffer) { + b.Reset() + c.bPool.Put(b) +} + +func toBool(r uint32) bool { + return r > 0 +} |