summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go200
1 files changed, 171 insertions, 29 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 67a6d3e7..2492ab60 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -5,10 +5,10 @@ import (
"context"
"encoding/gob"
"os"
+ "sync"
"sync/atomic"
"time"
- "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -22,11 +22,12 @@ import (
)
const (
- PluginName = "boltdb"
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
- PushBucket = "push"
- InQueueBucket = "processing"
- DoneBucket = "done"
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
)
type consumer struct {
@@ -37,11 +38,16 @@ type consumer struct {
db *bolt.DB
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
listeners uint32
- pipeline atomic.Value
+ active *uint64
+ delayed *uint64
stopCh chan struct{}
}
@@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
return nil
})
+
if err != nil {
return nil, errors.E(op, err)
}
@@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
priority: localCfg.Priority,
prefetch: localCfg.Prefetch,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
@@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// add default values
conf.InitDefaults()
- db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
return nil
})
@@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
}
return &consumer{
- file: pipeline.String(file, "rr.db"),
+ file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
prefetch: pipeline.Int(prefetch, 100),
permissions: conf.Permissions,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
-func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return b.Put(utils.AsBytes(tKey), value)
+ }
+
b := tx.Bucket(utils.AsBytes(PushBucket))
- buf := new(bytes.Buffer)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
enc := gob.NewEncoder(buf)
- err := enc.Encode(job)
+ err := enc.Encode(item)
if err != nil {
- return err
+ return errors.E(op, err)
}
- return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return b.Put(utils.AsBytes(item.ID()), value)
})
if err != nil {
@@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
-func (c *consumer) Stop(ctx context.Context) error {
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
-func (c *consumer) Pause(ctx context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
}
c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
atomic.AddUint32(&c.listeners, ^uint32(0))
@@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
})
}
-func (c *consumer) Resume(ctx context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) {
// run listener
go c.listener()
+ go c.delayedJobsListener()
// increase number of listeners
atomic.AddUint32(&c.listeners, 1)
@@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) {
})
}
-func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
}