diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 48 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 10 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 |
3 files changed, 53 insertions, 14 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index e31e3b25..5cf4c633 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -6,21 +6,47 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" ) +type Config struct { + PipelineSize uint64 `mapstructure:"pipeline_size"` +} + type JobBroker struct { - queues sync.Map - pq priorityqueue.Queue + cfg *Config + log logger.Logger + queues sync.Map + pq priorityqueue.Queue + localQueue chan *Item } -func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { + const op = errors.Op("new_ephemeral_pipeline") + jb := &JobBroker{ - queues: sync.Map{}, - pq: q, + log: log, + pq: q, + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.PipelineSize == 0 { + jb.cfg.PipelineSize = 100_000 } + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + return jb, nil } @@ -39,13 +65,14 @@ func (j *JobBroker) Push(job *structs.Job) error { time.Sleep(jj.Options.TimeoutDuration()) // send the item after timeout expired - j.pq.Insert(From(job)) + j.localQueue <- From(job) }(job) return nil } - j.pq.Insert(From(job)) + // insert to the local, limited pipeline + j.localQueue <- From(job) return nil } @@ -53,6 +80,13 @@ func (j *JobBroker) Push(job *structs.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } +func (j *JobBroker) consume() { + // redirect + for item := range j.localQueue { + j.pq.Insert(item) + } +} + func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.queues.Load(pipeline.Name()); ok { diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index e2caa53a..40c6b3e4 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -103,10 +103,12 @@ func (j *Item) Body() []byte { func (j *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout uint64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, ) if err != nil { diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 3d6a95b7..60c6e245 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -3,6 +3,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,10 +13,12 @@ const ( type Plugin struct { log logger.Logger + cfg config.Configurer } -func (p *Plugin) Init(log logger.Logger) error { +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log + p.cfg = cfg return nil } @@ -23,6 +26,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(q) +func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, q) } |