diff options
Diffstat (limited to 'plugins/boltdb/boltjobs')
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 37 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 225 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 24 |
3 files changed, 239 insertions, 47 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go index 013e30bf..8cc098c1 100644 --- a/plugins/boltdb/boltjobs/config.go +++ b/plugins/boltdb/boltjobs/config.go @@ -1,16 +1,39 @@ package boltjobs -type Config struct { - // File is boltDB file. No need to create it by your own, - // boltdb driver is able to create the file, or read existing - File string - // Bucket to store data in boltDB - bucket string +const ( + file string = "file" + priority string = "priority" + prefetch string = "prefetch" +) + +type GlobalCfg struct { // db file permissions - Permissions int + Permissions int `mapstructure:"permissions"` // consume timeout } +func (c *GlobalCfg) InitDefaults() { + if c.Permissions == 0 { + c.Permissions = 0777 + } +} + +type Config struct { + File string `mapstructure:"file"` + Priority int `mapstructure:"priority"` + Prefetch int `mapstructure:"prefetch"` +} + func (c *Config) InitDefaults() { + if c.File == "" { + c.File = "rr.db" + } + + if c.Priority == 0 { + c.Priority = 10 + } + if c.Prefetch == 0 { + c.Prefetch = 1000 + } } diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index a8db2f30..67a6d3e7 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -1,11 +1,14 @@ package boltjobs import ( + "bytes" "context" + "encoding/gob" "os" "sync/atomic" "time" + "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -20,19 +23,27 @@ import ( const ( PluginName = "boltdb" + + PushBucket = "push" + InQueueBucket = "processing" + DoneBucket = "done" ) type consumer struct { - // bbolt configuration file string permissions int - bucket string - db *bolt.DB + priority int + prefetch int + + db *bolt.DB + + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + listeners uint32 + pipeline atomic.Value - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - pipe atomic.Value + stopCh chan struct{} } func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { @@ -47,26 +58,88 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e return nil, errors.E(op, errors.Str("no global boltdb configuration")) } - conf := &Config{} + conf := &GlobalCfg{} - err := cfg.UnmarshalKey(configKey, conf) + err := cfg.UnmarshalKey(PluginName, conf) if err != nil { return nil, errors.E(op, err) } - // add default values + localCfg := &Config{} + err = cfg.UnmarshalKey(configKey, localCfg) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg.InitDefaults() conf.InitDefaults() - c := &consumer{ - file: conf.File, + + db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ permissions: conf.Permissions, - bucket: conf.bucket, + file: localCfg.File, + priority: localCfg.Priority, + prefetch: localCfg.Prefetch, + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 1), + }, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") - log: log, - eh: e, - pq: pq, + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) } - db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{ + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + + db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{ Timeout: time.Second * 20, NoGrowSync: false, NoFreelistSync: false, @@ -78,51 +151,135 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e return nil, errors.E(op, err) } - c.db = db - // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) if err != nil { return errors.E(op, upOp) } return nil }) - return c, nil -} + if err != nil { + return nil, errors.E(op, err) + } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - return &consumer{}, nil + return &consumer{ + file: pipeline.String(file, "rr.db"), + priority: pipeline.Int(priority, 10), + prefetch: pipeline.Int(prefetch, 100), + permissions: conf.Permissions, + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 1), + }, nil } func (c *consumer) Push(ctx context.Context, job *job.Job) error { - panic("implement me") + const op = errors.Op("boltdb_jobs_push") + err := c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(utils.AsBytes(PushBucket)) + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err := enc.Encode(job) + if err != nil { + return err + } + + return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes()) + }) + + if err != nil { + return errors.E(op, err) + } + + return nil } func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipe.Store(pipeline) + c.pipeline.Store(pipeline) return nil } -func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error { - panic("implement me") +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("boltdb_run") + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + return nil } func (c *consumer) Stop(ctx context.Context) error { - panic("implement me") + return nil } -func (c *consumer) Pause(ctx context.Context, pipeline string) { - panic("implement me") +func (c *consumer) Pause(ctx context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + c.stopCh <- struct{}{} + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } -func (c *consumer) Resume(ctx context.Context, pipeline string) { - panic("implement me") +func (c *consumer) Resume(ctx context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 1 { + c.log.Warn("amqp listener already in the active state") + return + } + + // run listener + go c.listener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (c *consumer) State(ctx context.Context) (*jobState.State, error) { - panic("implement me") + return nil, nil } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 4a8d6cd9..2ee06088 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -1,22 +1,34 @@ package boltjobs -import "time" +import ( + "fmt" + "time" + + "github.com/spiral/roadrunner/v2/utils" +) func (c *consumer) listener() { tt := time.NewTicker(time.Second) for { select { + case <-c.stopCh: + c.log.Warn("boltdb listener stopped") + return case <-tt.C: tx, err := c.db.Begin(false) if err != nil { panic(err) } - // cursor := tx.Cursor() - err = tx.Commit() - if err != nil { - panic(err) - } + b := tx.Bucket(utils.AsBytes(PushBucket)) + + cursor := b.Cursor() + + k, v := cursor.First() + _ = k + _ = v + + fmt.Println("foo") } } } |