diff options
Diffstat (limited to 'plugins/memory/memoryjobs/consumer.go')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..79246063 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -61,6 +61,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return nil, errors.E(op, err) } + if jb.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey)) + } + if jb.cfg.Prefetch == 0 { jb.cfg.Prefetch = 100_000 } @@ -118,6 +122,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -173,17 +199,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) |