diff options
author | Valery Piashchynski <[email protected]> | 2021-07-18 11:32:44 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-18 11:32:44 +0300 |
commit | 9c51360f9119a4114bdcc21c8e61f0908a3c876d (patch) | |
tree | ea63a051931bbd8282d64478bbefa2f970fcc955 /plugins | |
parent | f4feb30197843d05eb308081ee579d3a9e3d6206 (diff) |
Started beanstalk driver. Add new Queue impl (not finished yet).
Fix bugs in the AMQP, update proto-api
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/plugin.go | 29 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 28 |
6 files changed, 59 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 20dcef2a..6def138e 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -141,7 +141,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con delayCache: make(map[string]struct{}, 100), } - // if no global section + // only global section if !cfg.Has(pluginName) { return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go new file mode 100644 index 00000000..d034d65c --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -0,0 +1 @@ +package beanstalk diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go new file mode 100644 index 00000000..d034d65c --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -0,0 +1 @@ +package beanstalk diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go new file mode 100644 index 00000000..2fea1c31 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/plugin.go @@ -0,0 +1,29 @@ +package beanstalk + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return nil, nil +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return nil, nil +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index f71c2718..98e7ebf8 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -432,6 +432,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { } // if pipeline initialized to be consumed, call Run on it + // but likely for the dynamic pipelines it should be started manually if _, ok := p.consume[pipeline.Name()]; ok { err = initializedDriver.Run(pipeline) if err != nil { @@ -440,6 +441,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { } } + // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) return nil diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index a2bd9c6d..4333c587 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) @@ -65,7 +66,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -76,7 +77,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -87,11 +88,32 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) er return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { resp.Pipelines = r.p.List() return nil } +// Declare pipeline used to dynamically declare any type of the pipeline +// Mandatory fields: +// 1. Driver +// 2. Pipeline name +// 3. Options related to the particular pipeline +func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("rcp_declare_pipeline") + pipe := &pipeline.Pipeline{} + + for i := range req.GetPipeline() { + (*pipe)[i] = req.GetPipeline()[i] + } + + err := r.p.Declare(pipe) + if err != nil { + return errors.E(op, err) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} |