diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 200 |
1 files changed, 171 insertions, 29 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 67a6d3e7..2492ab60 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -5,10 +5,10 @@ import ( "context" "encoding/gob" "os" + "sync" "sync/atomic" "time" - "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -22,11 +22,12 @@ import ( ) const ( - PluginName = "boltdb" + PluginName string = "boltdb" + rrDB string = "rr.db" - PushBucket = "push" - InQueueBucket = "processing" - DoneBucket = "done" + PushBucket string = "push" + InQueueBucket string = "processing" + DelayBucket string = "delayed" ) type consumer struct { @@ -37,11 +38,16 @@ type consumer struct { db *bolt.DB - log logger.Logger - eh events.Handler - pq priorityqueue.Queue + bPool sync.Pool + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipeline atomic.Value + cond *sync.Cond + listeners uint32 - pipeline atomic.Value + active *uint64 + delayed *uint64 stopCh chan struct{} } @@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) if err != nil { return errors.E(op, upOp) } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } return nil }) + if err != nil { return nil, errors.E(op, err) } @@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e priority: localCfg.Priority, prefetch: localCfg.Prefetch, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + db: db, log: log, eh: e, pq: pq, - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), }, nil } @@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // add default values conf.InitDefaults() - db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{ + db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ Timeout: time.Second * 20, NoGrowSync: false, NoFreelistSync: false, @@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) if err != nil { return errors.E(op, upOp) } - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) if err != nil { return errors.E(op, upOp) } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + return nil }) @@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con } return &consumer{ - file: pipeline.String(file, "rr.db"), + file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), prefetch: pipeline.Int(prefetch, 100), permissions: conf.Permissions, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + db: db, log: log, eh: e, pq: pq, - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 2), }, nil } -func (c *consumer) Push(ctx context.Context, job *job.Job) error { +func (c *consumer) Push(_ context.Context, job *job.Job) error { const op = errors.Op("boltdb_jobs_push") err := c.db.Update(func(tx *bolt.Tx) error { + item := fromJob(job) + + // handle delay + if item.Options.Delay > 0 { + b := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) + + // pool with buffers + buf := c.get() + defer c.put(buf) + + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + + atomic.AddUint64(c.delayed, 1) + + return b.Put(utils.AsBytes(tKey), value) + } + b := tx.Bucket(utils.AsBytes(PushBucket)) - buf := new(bytes.Buffer) + + // pool with buffers + buf := c.get() + defer c.put(buf) + enc := gob.NewEncoder(buf) - err := enc.Encode(job) + err := enc.Encode(item) if err != nil { - return err + return errors.E(op, err) } - return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes()) + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + + // increment active counter + atomic.AddUint64(c.active, 1) + + return b.Put(utils.AsBytes(item.ID()), value) }) if err != nil { @@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } -func (c *consumer) Stop(ctx context.Context) error { +func (c *consumer) Stop(_ context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + } + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) return nil } -func (c *consumer) Pause(ctx context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) { } c.stopCh <- struct{}{} + c.stopCh <- struct{}{} atomic.AddUint32(&c.listeners, ^uint32(0)) @@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) { }) } -func (c *consumer) Resume(ctx context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) { // run listener go c.listener() + go c.delayedJobsListener() // increase number of listeners atomic.AddUint32(&c.listeners, 1) @@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) { }) } -func (c *consumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: PushBucket, + Active: int64(atomic.LoadUint64(c.active)), + Delayed: int64(atomic.LoadUint64(c.delayed)), + Ready: toBool(atomic.LoadUint32(&c.listeners)), + }, nil +} + +// Private + +func (c *consumer) get() *bytes.Buffer { + return c.bPool.Get().(*bytes.Buffer) +} + +func (c *consumer) put(b *bytes.Buffer) { + b.Reset() + c.bPool.Put(b) +} + +func toBool(r uint32) bool { + return r > 0 } |