diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 14:28:05 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 14:28:05 +0300 |
commit | 05956485a121ee47a2f8281a8a0dffd7eecc68aa (patch) | |
tree | 1fd0ffd60c1731ea6934300506c3061f6e65d1d7 /plugins/jobs/brokers/ephemeral | |
parent | c7becb2fc51fc09523f6640eb72f360a6b4681f5 (diff) |
Add pipeline and job plugin options...
Skeleton for the amqp plugin.
Add Timeout and Pipeline to the job.Context() method.
Implement queue limits for the ephemeral driver with main priority queue
limits.
Update configuration, add pipeline_size for every pipeline and jobs
priority queue size.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 48 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 10 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 |
3 files changed, 53 insertions, 14 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index e31e3b25..5cf4c633 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -6,21 +6,47 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" ) +type Config struct { + PipelineSize uint64 `mapstructure:"pipeline_size"` +} + type JobBroker struct { - queues sync.Map - pq priorityqueue.Queue + cfg *Config + log logger.Logger + queues sync.Map + pq priorityqueue.Queue + localQueue chan *Item } -func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { + const op = errors.Op("new_ephemeral_pipeline") + jb := &JobBroker{ - queues: sync.Map{}, - pq: q, + log: log, + pq: q, + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.PipelineSize == 0 { + jb.cfg.PipelineSize = 100_000 } + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + return jb, nil } @@ -39,13 +65,14 @@ func (j *JobBroker) Push(job *structs.Job) error { time.Sleep(jj.Options.TimeoutDuration()) // send the item after timeout expired - j.pq.Insert(From(job)) + j.localQueue <- From(job) }(job) return nil } - j.pq.Insert(From(job)) + // insert to the local, limited pipeline + j.localQueue <- From(job) return nil } @@ -53,6 +80,13 @@ func (j *JobBroker) Push(job *structs.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } +func (j *JobBroker) consume() { + // redirect + for item := range j.localQueue { + j.pq.Insert(item) + } +} + func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.queues.Load(pipeline.Name()); ok { diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index e2caa53a..40c6b3e4 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -103,10 +103,12 @@ func (j *Item) Body() []byte { func (j *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout uint64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, ) if err != nil { diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 3d6a95b7..60c6e245 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -3,6 +3,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,10 +13,12 @@ const ( type Plugin struct { log logger.Logger + cfg config.Configurer } -func (p *Plugin) Init(log logger.Logger) error { +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log + p.cfg = cfg return nil } @@ -23,6 +26,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(q) +func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, q) } |