diff options
author | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
commit | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch) | |
tree | 8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) |
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
23 files changed, 415 insertions, 147 deletions
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go new file mode 100644 index 00000000..013e30bf --- /dev/null +++ b/plugins/boltdb/boltjobs/config.go @@ -0,0 +1,16 @@ +package boltjobs + +type Config struct { + // File is boltDB file. No need to create it by your own, + // boltdb driver is able to create the file, or read existing + File string + // Bucket to store data in boltDB + bucket string + // db file permissions + Permissions int + // consume timeout +} + +func (c *Config) InitDefaults() { + +} diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go new file mode 100644 index 00000000..a8db2f30 --- /dev/null +++ b/plugins/boltdb/boltjobs/consumer.go @@ -0,0 +1,128 @@ +package boltjobs + +import ( + "context" + "os" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + PluginName = "boltdb" +) + +type consumer struct { + // bbolt configuration + file string + permissions int + bucket string + db *bolt.DB + + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipe atomic.Value +} + +func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &Config{} + + err := cfg.UnmarshalKey(configKey, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + c := &consumer{ + file: conf.File, + permissions: conf.Permissions, + bucket: conf.bucket, + + log: log, + eh: e, + pq: pq, + } + + db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + c.db = db + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + + return c, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + return &consumer{}, nil +} + +func (c *consumer) Push(ctx context.Context, job *job.Job) error { + panic("implement me") +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipe.Store(pipeline) + return nil +} + +func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error { + panic("implement me") +} + +func (c *consumer) Stop(ctx context.Context) error { + panic("implement me") +} + +func (c *consumer) Pause(ctx context.Context, pipeline string) { + panic("implement me") +} + +func (c *consumer) Resume(ctx context.Context, pipeline string) { + panic("implement me") +} + +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { + panic("implement me") +} diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go new file mode 100644 index 00000000..8a4aefa3 --- /dev/null +++ b/plugins/boltdb/boltjobs/item.go @@ -0,0 +1,77 @@ +package boltjobs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + panic("implement me") +} + +func (i *Item) Nack() error { + panic("implement me") +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + panic("implement me") +} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go new file mode 100644 index 00000000..1f8e6ff1 --- /dev/null +++ b/plugins/boltdb/boltjobs/listener.go @@ -0,0 +1,22 @@ +package boltjobs + +import "time" + +func (c *consumer) listener() { + tt := time.NewTicker(time.Second) + for { + select { + case <-tt.C: + tx, err := c.db.Begin(false) + if err != nil { + panic(err) + } + //cursor := tx.Cursor() + + err = tx.Commit() + if err != nil { + panic(err) + } + } + } +} diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/boltdb/boltkv/config.go index 0beb209b..56d00674 100644 --- a/plugins/kv/drivers/boltdb/config.go +++ b/plugins/boltdb/boltkv/config.go @@ -1,4 +1,4 @@ -package boltdb +package boltkv type Config struct { // File is boltDB file. No need to create it by your own, diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/boltdb/boltkv/driver.go index 15a5674f..ba1450cd 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -1,4 +1,4 @@ -package boltdb +package boltkv import ( "bytes" @@ -16,6 +16,10 @@ import ( bolt "go.etcd.io/bbolt" ) +const ( + RootPluginName string = "kv" +) + type Driver struct { clearMu sync.RWMutex // db instance @@ -24,7 +28,8 @@ type Driver struct { bucket []byte log logger.Logger cfg *Config - // gc contains key which are contain timeouts + + // gc contains keys with timeouts gc sync.Map // default timeout for cache cleanup is 1 minute timeout time.Duration @@ -36,6 +41,10 @@ type Driver struct { func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { const op = errors.Op("new_boltdb_driver") + if !cfgPlugin.Has(RootPluginName) { + return nil, errors.E(op, errors.Str("no kv section in the configuration")) + } + d := &Driver{ log: log, stop: stop, @@ -157,7 +166,7 @@ func (d *Driver) Get(key string) ([]byte, error) { } // set the value - val = []byte(i) + val = utils.AsBytes(i) } return nil }) diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go new file mode 100644 index 00000000..683b26f1 --- /dev/null +++ b/plugins/boltdb/plugin.go @@ -0,0 +1,82 @@ +package boltdb + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "boltdb" +) + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + + drivers uint +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.stop = make(chan struct{}) + p.log = log + p.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (p *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (p *Plugin) Stop() error { + if p.drivers > 0 { + for i := uint(0); i < p.drivers; i++ { + // send close signal to every driver + p.stop <- struct{}{} + } + } + return nil +} + +// Name returns plugin name +func (p *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (p *Plugin) Available() {} + +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + if err != nil { + return nil, errors.E(op, err) + } + + // save driver number to release resources after Stop + p.drivers++ + + return st, nil +} + +// JOBS bbolt implementation + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) +} diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go index 1ec089f1..ac2f6e53 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/config.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs // pipeline rabbitmq info const ( diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go index 95df02ec..1931ceaa 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -20,7 +20,11 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +const ( + pluginName string = "amqp" +) + +type consumer struct { sync.Mutex log logger.Logger pq priorityqueue.Queue @@ -58,7 +62,7 @@ type JobConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, globalCfg.InitDefault() // PARSE CONFIGURATION END ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: e, @@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // PARSE CONFIGURATION ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, eh: e, pq: pq, @@ -214,7 +218,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { +func (j *consumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -232,12 +236,12 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +291,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("amqp_driver_state") select { case pch := <-j.publishChan: @@ -316,7 +320,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { } } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -354,7 +358,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -413,7 +417,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { j.stopCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -427,7 +431,7 @@ func (j *JobConsumer) Stop(context.Context) error { } // handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") select { case pch := <-j.publishChan: diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go index 623dcca7..a8e305ea 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/item.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -139,7 +139,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") item, err := j.unpack(d) if err != nil { @@ -194,7 +194,7 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { +func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go index 0b1cd2dc..0156d55c 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/listener.go @@ -1,8 +1,8 @@ -package amqp +package amqpjobs import amqp "github.com/rabbitmq/amqp091-go" -func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { +func (j *consumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go index 56ef10c8..e260fabe 100644 --- a/plugins/jobs/drivers/amqp/rabbit_init.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go @@ -1,10 +1,10 @@ -package amqp +package amqpjobs import ( "github.com/spiral/errors" ) -func (j *JobConsumer) initRabbitMQ() error { +func (j *consumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go index 8dc18b8f..0835e3ea 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/redial.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "time" @@ -11,7 +11,7 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobConsumer) redialer() { //nolint:gocognit +func (j *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go index 624f4405..8797d20b 100644 --- a/plugins/jobs/drivers/amqp/plugin.go +++ b/plugins/jobs/drivers/amqp/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -31,10 +32,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) + return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) } // FromPipeline constructs AMQP driver from pipeline func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, e, pq) + return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq) } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 6323148b..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -19,7 +19,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +type consumer struct { log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -43,7 +43,7 @@ type JobConsumer struct { requeueCh chan *Item } -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return jc, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { +func (j *consumer) handleItem(ctx context.Context, item *Item) error { const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) @@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } // State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("beanstalk_state") stat, err := j.pool.Stats(ctx) if err != nil { @@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index f1d7ac76..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index f1385e70..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -4,7 +4,7 @@ import ( "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen() { +func (j *consumer) listen() { for { select { case <-j.stopCh: diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index f0992cd6..91b8eda9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -25,7 +25,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobConsumer struct { +type consumer struct { cfg *Config log logger.Logger eh events.Handler @@ -43,10 +43,10 @@ type JobConsumer struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - jb := &JobConsumer{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { +func (j *consumer) State(_ context.Context) (*jobState.State, error) { pipe := j.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), @@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { }, nil } -func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(ctx context.Context) error { +func (j *consumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error { } } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) @@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } } -func (j *JobConsumer) consume() { +func (j *consumer) consume() { go func() { // redirect for { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 17af1caa..23203190 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -24,7 +24,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -type JobConsumer struct { +type consumer struct { sync.Mutex pq priorityqueue.Queue log logger.Logger @@ -56,7 +56,7 @@ type JobConsumer struct { pauseCh chan struct{} } -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no such key - error @@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure globalCfg.InitDefault() // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure return jb, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no global section @@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf } // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered @@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ QueueUrl: j.queueURL, @@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") j.Lock() @@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { j.pauseCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { d, err := msg.pack(j.queueURL) if err != nil { return err diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index df72b2e5..996adf6c 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { +func (j *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 9efef90d..a4280af2 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -18,7 +18,7 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit +func (j *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { case <-j.pauseCh: diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go deleted file mode 100644 index c839130f..00000000 --- a/plugins/kv/drivers/boltdb/plugin.go +++ /dev/null @@ -1,71 +0,0 @@ -package boltdb - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "boltdb" - RootPluginName string = "kv" -) - -// Plugin BoltDB K/V storage. -type Plugin struct { - cfgPlugin config.Configurer - // logger - log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint -} - -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(RootPluginName) { - return errors.E(errors.Disabled) - } - - s.stop = make(chan struct{}) - s.log = log - s.cfgPlugin = cfg - return nil -} - -// Serve is noop here -func (s *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (s *Plugin) Stop() error { - if s.drivers > 0 { - for i := uint(0); i < s.drivers; i++ { - // send close signal to every driver - s.stop <- struct{}{} - } - } - return nil -} - -func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_provide") - st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) - if err != nil { - return nil, errors.E(op, err) - } - - // save driver number to release resources after Stop - s.drivers++ - - return st, nil -} - -// Name returns plugin name -func (s *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (s *Plugin) Available() {} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 53fade97..9a19f96c 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -109,6 +109,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the configuration + // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs switch v.(map[string]interface{})[driver] { case memcached: if _, ok := p.constructors[memcached]; !ok { @@ -220,5 +221,4 @@ func (p *Plugin) Name() string { } // Available interface implementation -func (p *Plugin) Available() { -} +func (p *Plugin) Available() {} |