summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb/boltjobs')
-rw-r--r--plugins/boltdb/boltjobs/config.go37
-rw-r--r--plugins/boltdb/boltjobs/consumer.go225
-rw-r--r--plugins/boltdb/boltjobs/listener.go24
3 files changed, 239 insertions, 47 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go
index 013e30bf..8cc098c1 100644
--- a/plugins/boltdb/boltjobs/config.go
+++ b/plugins/boltdb/boltjobs/config.go
@@ -1,16 +1,39 @@
package boltjobs
-type Config struct {
- // File is boltDB file. No need to create it by your own,
- // boltdb driver is able to create the file, or read existing
- File string
- // Bucket to store data in boltDB
- bucket string
+const (
+ file string = "file"
+ priority string = "priority"
+ prefetch string = "prefetch"
+)
+
+type GlobalCfg struct {
// db file permissions
- Permissions int
+ Permissions int `mapstructure:"permissions"`
// consume timeout
}
+func (c *GlobalCfg) InitDefaults() {
+ if c.Permissions == 0 {
+ c.Permissions = 0777
+ }
+}
+
+type Config struct {
+ File string `mapstructure:"file"`
+ Priority int `mapstructure:"priority"`
+ Prefetch int `mapstructure:"prefetch"`
+}
+
func (c *Config) InitDefaults() {
+ if c.File == "" {
+ c.File = "rr.db"
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+ if c.Prefetch == 0 {
+ c.Prefetch = 1000
+ }
}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index a8db2f30..67a6d3e7 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -1,11 +1,14 @@
package boltjobs
import (
+ "bytes"
"context"
+ "encoding/gob"
"os"
"sync/atomic"
"time"
+ "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -20,19 +23,27 @@ import (
const (
PluginName = "boltdb"
+
+ PushBucket = "push"
+ InQueueBucket = "processing"
+ DoneBucket = "done"
)
type consumer struct {
- // bbolt configuration
file string
permissions int
- bucket string
- db *bolt.DB
+ priority int
+ prefetch int
+
+ db *bolt.DB
+
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ listeners uint32
+ pipeline atomic.Value
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
- pipe atomic.Value
+ stopCh chan struct{}
}
func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
@@ -47,26 +58,88 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
return nil, errors.E(op, errors.Str("no global boltdb configuration"))
}
- conf := &Config{}
+ conf := &GlobalCfg{}
- err := cfg.UnmarshalKey(configKey, conf)
+ err := cfg.UnmarshalKey(PluginName, conf)
if err != nil {
return nil, errors.E(op, err)
}
- // add default values
+ localCfg := &Config{}
+ err = cfg.UnmarshalKey(configKey, localCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg.InitDefaults()
conf.InitDefaults()
- c := &consumer{
- file: conf.File,
+
+ db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
permissions: conf.Permissions,
- bucket: conf.bucket,
+ file: localCfg.File,
+ priority: localCfg.Priority,
+ prefetch: localCfg.Prefetch,
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 1),
+ }, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
- log: log,
- eh: e,
- pq: pq,
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
}
- db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+
+ db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -78,51 +151,135 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
return nil, errors.E(op, err)
}
- c.db = db
-
// create bucket if it does not exist
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
if err != nil {
return errors.E(op, upOp)
}
return nil
})
- return c, nil
-}
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- return &consumer{}, nil
+ return &consumer{
+ file: pipeline.String(file, "rr.db"),
+ priority: pipeline.Int(priority, 10),
+ prefetch: pipeline.Int(prefetch, 100),
+ permissions: conf.Permissions,
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 1),
+ }, nil
}
func (c *consumer) Push(ctx context.Context, job *job.Job) error {
- panic("implement me")
+ const op = errors.Op("boltdb_jobs_push")
+ err := c.db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(job)
+ if err != nil {
+ return err
+ }
+
+ return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- c.pipe.Store(pipeline)
+ c.pipeline.Store(pipeline)
return nil
}
-func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error {
- panic("implement me")
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("boltdb_run")
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+ return nil
}
func (c *consumer) Stop(ctx context.Context) error {
- panic("implement me")
+ return nil
}
-func (c *consumer) Pause(ctx context.Context, pipeline string) {
- panic("implement me")
+func (c *consumer) Pause(ctx context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ c.stopCh <- struct{}{}
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
-func (c *consumer) Resume(ctx context.Context, pipeline string) {
- panic("implement me")
+func (c *consumer) Resume(ctx context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 1 {
+ c.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ // run listener
+ go c.listener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- panic("implement me")
+ return nil, nil
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 4a8d6cd9..2ee06088 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -1,22 +1,34 @@
package boltjobs
-import "time"
+import (
+ "fmt"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+)
func (c *consumer) listener() {
tt := time.NewTicker(time.Second)
for {
select {
+ case <-c.stopCh:
+ c.log.Warn("boltdb listener stopped")
+ return
case <-tt.C:
tx, err := c.db.Begin(false)
if err != nil {
panic(err)
}
- // cursor := tx.Cursor()
- err = tx.Commit()
- if err != nil {
- panic(err)
- }
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+
+ cursor := b.Cursor()
+
+ k, v := cursor.First()
+ _ = k
+ _ = v
+
+ fmt.Println("foo")
}
}
}