diff options
author | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
commit | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch) | |
tree | 8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/boltdb/boltjobs/consumer.go | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) |
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go new file mode 100644 index 00000000..a8db2f30 --- /dev/null +++ b/plugins/boltdb/boltjobs/consumer.go @@ -0,0 +1,128 @@ +package boltjobs + +import ( + "context" + "os" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + PluginName = "boltdb" +) + +type consumer struct { + // bbolt configuration + file string + permissions int + bucket string + db *bolt.DB + + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipe atomic.Value +} + +func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &Config{} + + err := cfg.UnmarshalKey(configKey, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + c := &consumer{ + file: conf.File, + permissions: conf.Permissions, + bucket: conf.bucket, + + log: log, + eh: e, + pq: pq, + } + + db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + c.db = db + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + + return c, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + return &consumer{}, nil +} + +func (c *consumer) Push(ctx context.Context, job *job.Job) error { + panic("implement me") +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipe.Store(pipeline) + return nil +} + +func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error { + panic("implement me") +} + +func (c *consumer) Stop(ctx context.Context) error { + panic("implement me") +} + +func (c *consumer) Pause(ctx context.Context, pipeline string) { + panic("implement me") +} + +func (c *consumer) Resume(ctx context.Context, pipeline string) { + panic("implement me") +} + +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { + panic("implement me") +} |