summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/boltdb/boltjobs/consumer.go
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go128
1 files changed, 128 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
new file mode 100644
index 00000000..a8db2f30
--- /dev/null
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -0,0 +1,128 @@
+package boltjobs
+
+import (
+ "context"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+const (
+ PluginName = "boltdb"
+)
+
+type consumer struct {
+ // bbolt configuration
+ file string
+ permissions int
+ bucket string
+ db *bolt.DB
+
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipe atomic.Value
+}
+
+func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &Config{}
+
+ err := cfg.UnmarshalKey(configKey, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+ c := &consumer{
+ file: conf.File,
+ permissions: conf.Permissions,
+ bucket: conf.bucket,
+
+ log: log,
+ eh: e,
+ pq: pq,
+ }
+
+ db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ c.db = db
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ return c, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ return &consumer{}, nil
+}
+
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+ panic("implement me")
+}
+
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipe.Store(pipeline)
+ return nil
+}
+
+func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (c *consumer) Stop(ctx context.Context) error {
+ panic("implement me")
+}
+
+func (c *consumer) Pause(ctx context.Context, pipeline string) {
+ panic("implement me")
+}
+
+func (c *consumer) Resume(ctx context.Context, pipeline string) {
+ panic("implement me")
+}
+
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
+ panic("implement me")
+}