summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/boltdb/boltjobs/config.go16
-rw-r--r--plugins/boltdb/boltjobs/consumer.go128
-rw-r--r--plugins/boltdb/boltjobs/item.go77
-rw-r--r--plugins/boltdb/boltjobs/listener.go22
-rw-r--r--plugins/boltdb/boltkv/config.go (renamed from plugins/kv/drivers/boltdb/config.go)2
-rw-r--r--plugins/boltdb/boltkv/driver.go (renamed from plugins/kv/drivers/boltdb/driver.go)15
-rw-r--r--plugins/boltdb/plugin.go82
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/config.go)2
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/consumer.go)32
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/item.go)6
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/listener.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/rabbit_init.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/redial.go)4
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go5
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go26
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go28
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go26
-rw-r--r--plugins/jobs/drivers/sqs/item.go2
-rw-r--r--plugins/jobs/drivers/sqs/listener.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go71
-rw-r--r--plugins/kv/plugin.go4
-rw-r--r--tests/plugins/kv/storage_plugin_test.go2
24 files changed, 416 insertions, 148 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go
new file mode 100644
index 00000000..013e30bf
--- /dev/null
+++ b/plugins/boltdb/boltjobs/config.go
@@ -0,0 +1,16 @@
+package boltjobs
+
+type Config struct {
+ // File is boltDB file. No need to create it by your own,
+ // boltdb driver is able to create the file, or read existing
+ File string
+ // Bucket to store data in boltDB
+ bucket string
+ // db file permissions
+ Permissions int
+ // consume timeout
+}
+
+func (c *Config) InitDefaults() {
+
+}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
new file mode 100644
index 00000000..a8db2f30
--- /dev/null
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -0,0 +1,128 @@
+package boltjobs
+
+import (
+ "context"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
+)
+
+const (
+ PluginName = "boltdb"
+)
+
+type consumer struct {
+ // bbolt configuration
+ file string
+ permissions int
+ bucket string
+ db *bolt.DB
+
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipe atomic.Value
+}
+
+func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
+ }
+
+ conf := &Config{}
+
+ err := cfg.UnmarshalKey(configKey, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+ c := &consumer{
+ file: conf.File,
+ permissions: conf.Permissions,
+ bucket: conf.bucket,
+
+ log: log,
+ eh: e,
+ pq: pq,
+ }
+
+ db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ c.db = db
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ return c, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ return &consumer{}, nil
+}
+
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+ panic("implement me")
+}
+
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipe.Store(pipeline)
+ return nil
+}
+
+func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (c *consumer) Stop(ctx context.Context) error {
+ panic("implement me")
+}
+
+func (c *consumer) Pause(ctx context.Context, pipeline string) {
+ panic("implement me")
+}
+
+func (c *consumer) Resume(ctx context.Context, pipeline string) {
+ panic("implement me")
+}
+
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
+ panic("implement me")
+}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
new file mode 100644
index 00000000..8a4aefa3
--- /dev/null
+++ b/plugins/boltdb/boltjobs/item.go
@@ -0,0 +1,77 @@
+package boltjobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ panic("implement me")
+}
+
+func (i *Item) Nack() error {
+ panic("implement me")
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ panic("implement me")
+}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
new file mode 100644
index 00000000..1f8e6ff1
--- /dev/null
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -0,0 +1,22 @@
+package boltjobs
+
+import "time"
+
+func (c *consumer) listener() {
+ tt := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-tt.C:
+ tx, err := c.db.Begin(false)
+ if err != nil {
+ panic(err)
+ }
+ //cursor := tx.Cursor()
+
+ err = tx.Commit()
+ if err != nil {
+ panic(err)
+ }
+ }
+ }
+}
diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/boltdb/boltkv/config.go
index 0beb209b..56d00674 100644
--- a/plugins/kv/drivers/boltdb/config.go
+++ b/plugins/boltdb/boltkv/config.go
@@ -1,4 +1,4 @@
-package boltdb
+package boltkv
type Config struct {
// File is boltDB file. No need to create it by your own,
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/boltdb/boltkv/driver.go
index 15a5674f..ba1450cd 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -1,4 +1,4 @@
-package boltdb
+package boltkv
import (
"bytes"
@@ -16,6 +16,10 @@ import (
bolt "go.etcd.io/bbolt"
)
+const (
+ RootPluginName string = "kv"
+)
+
type Driver struct {
clearMu sync.RWMutex
// db instance
@@ -24,7 +28,8 @@ type Driver struct {
bucket []byte
log logger.Logger
cfg *Config
- // gc contains key which are contain timeouts
+
+ // gc contains keys with timeouts
gc sync.Map
// default timeout for cache cleanup is 1 minute
timeout time.Duration
@@ -36,6 +41,10 @@ type Driver struct {
func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
+ if !cfgPlugin.Has(RootPluginName) {
+ return nil, errors.E(op, errors.Str("no kv section in the configuration"))
+ }
+
d := &Driver{
log: log,
stop: stop,
@@ -157,7 +166,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
}
// set the value
- val = []byte(i)
+ val = utils.AsBytes(i)
}
return nil
})
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
new file mode 100644
index 00000000..683b26f1
--- /dev/null
+++ b/plugins/boltdb/plugin.go
@@ -0,0 +1,82 @@
+package boltdb
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "boltdb"
+)
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+
+ drivers uint
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.stop = make(chan struct{})
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (p *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (p *Plugin) Stop() error {
+ if p.drivers > 0 {
+ for i := uint(0); i < p.drivers; i++ {
+ // send close signal to every driver
+ p.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+// Name returns plugin name
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Available interface implementation
+func (p *Plugin) Available() {}
+
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ p.drivers++
+
+ return st, nil
+}
+
+// JOBS bbolt implementation
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
+ return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+}
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go
index 1ec089f1..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/config.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
// pipeline rabbitmq info
const (
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
index 95df02ec..1931ceaa 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -20,7 +20,11 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+const (
+ pluginName string = "amqp"
+)
+
+type consumer struct {
sync.Mutex
log logger.Logger
pq priorityqueue.Queue
@@ -58,7 +62,7 @@ type JobConsumer struct {
}
// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
globalCfg.InitDefault()
// PARSE CONFIGURATION END -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: e,
@@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer_from_pipeline")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// PARSE CONFIGURATION -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
eh: e,
pq: pq,
@@ -214,7 +218,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
+func (j *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -232,12 +236,12 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +291,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
case pch := <-j.publishChan:
@@ -316,7 +320,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -354,7 +358,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -413,7 +417,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -427,7 +431,7 @@ func (j *JobConsumer) Stop(context.Context) error {
}
// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go
index 623dcca7..a8e305ea 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/item.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -139,7 +139,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
item, err := j.unpack(d)
if err != nil {
@@ -194,7 +194,7 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
+func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
index 0b1cd2dc..0156d55c 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
@@ -1,8 +1,8 @@
-package amqp
+package amqpjobs
import amqp "github.com/rabbitmq/amqp091-go"
-func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
+func (j *consumer) listener(deliv <-chan amqp.Delivery) {
go func() {
for { //nolint:gosimple
select {
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
index 56ef10c8..e260fabe 100644
--- a/plugins/jobs/drivers/amqp/rabbit_init.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
@@ -1,10 +1,10 @@
-package amqp
+package amqpjobs
import (
"github.com/spiral/errors"
)
-func (j *JobConsumer) initRabbitMQ() error {
+func (j *consumer) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
index 8dc18b8f..0835e3ea 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"time"
@@ -11,7 +11,7 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
+func (j *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
index 624f4405..8797d20b 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/jobs/drivers/amqp/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -31,10 +32,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+ return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
}
// FromPipeline constructs AMQP driver from pipeline
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
+ return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq)
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 6323148b..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -19,7 +19,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+type consumer struct {
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -43,7 +43,7 @@ type JobConsumer struct {
requeueCh chan *Item
}
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return jc, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered
@@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+func (j *consumer) handleItem(ctx context.Context, item *Item) error {
const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
@@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("beanstalk_state")
stat, err := j.pool.Stats(ctx)
if err != nil {
@@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index f1d7ac76..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
+func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index f1385e70..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -4,7 +4,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen() {
+func (j *consumer) listen() {
for {
select {
case <-j.stopCh:
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index f0992cd6..91b8eda9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -25,7 +25,7 @@ type Config struct {
Prefetch uint64 `mapstructure:"prefetch"`
}
-type JobConsumer struct {
+type consumer struct {
cfg *Config
log logger.Logger
eh events.Handler
@@ -43,10 +43,10 @@ type JobConsumer struct {
stopCh chan struct{}
}
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_ephemeral_pipeline")
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- jb := &JobConsumer{
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
@@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+func (j *consumer) State(_ context.Context) (*jobState.State, error) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
@@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
}, nil
}
-func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(ctx context.Context) error {
+func (j *consumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error {
}
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
@@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
}
-func (j *JobConsumer) consume() {
+func (j *consumer) consume() {
go func() {
// redirect
for {
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 17af1caa..23203190 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -24,7 +24,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-type JobConsumer struct {
+type consumer struct {
sync.Mutex
pq priorityqueue.Queue
log logger.Logger
@@ -56,7 +56,7 @@ type JobConsumer struct {
pauseCh chan struct{}
}
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no such key - error
@@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
globalCfg.InitDefault()
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
return jb, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no global section
@@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
}
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
@@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
QueueUrl: j.queueURL,
@@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
j.Lock()
@@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.pauseCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
d, err := msg.pack(j.queueURL)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index df72b2e5..996adf6c 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
+func (j *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 9efef90d..a4280af2 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -18,7 +18,7 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
+func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
case <-j.pauseCh:
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
deleted file mode 100644
index c839130f..00000000
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package boltdb
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/kv"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "boltdb"
- RootPluginName string = "kv"
-)
-
-// Plugin BoltDB K/V storage.
-type Plugin struct {
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(RootPluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.stop = make(chan struct{})
- s.log = log
- s.cfgPlugin = cfg
- return nil
-}
-
-// Serve is noop here
-func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
-}
-
-func (s *Plugin) Stop() error {
- if s.drivers > 0 {
- for i := uint(0); i < s.drivers; i++ {
- // send close signal to every driver
- s.stop <- struct{}{}
- }
- }
- return nil
-}
-
-func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_provide")
- st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save driver number to release resources after Stop
- s.drivers++
-
- return st, nil
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (s *Plugin) Available() {}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 53fade97..9a19f96c 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -109,6 +109,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
+ // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.constructors[memcached]; !ok {
@@ -220,5 +221,4 @@ func (p *Plugin) Name() string {
}
// Available interface implementation
-func (p *Plugin) Available() {
-}
+func (p *Plugin) Available() {}
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index ced1c5fe..a09a456c 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -12,9 +12,9 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/boltdb"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/memory"