diff options
author | Valery Piashchynski <[email protected]> | 2021-09-12 21:33:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-12 21:33:23 +0300 |
commit | a8db14665d22e59db876cc6a52f649b88421055a (patch) | |
tree | 857e126b4f0bab8cdc0a8e0596d2ae6f5335cef0 /plugins | |
parent | fe8ae870ff2bf81a04868d3e42f319d54453a6b1 (diff) |
change jobs-memory run behavior
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..dacc2848 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) |