diff options
author | Valery Piashchynski <[email protected]> | 2021-09-12 22:44:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-12 22:44:34 +0300 |
commit | b7629ed14bb57c770e1e7f6f2da2d967b72c9789 (patch) | |
tree | e537a4843b2acf35fb4d72a149b399cca2636d93 /plugins | |
parent | 52c2330cbd126533d992d4b73b3f8e2f2e27f7bf (diff) | |
parent | fe72d84395970281fe330a2b9423f8c0e4c3b9c8 (diff) |
Merge remote-tracking branch 'origin' into bug/local-and-global-sections-incorrect-parsing
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index bebea3ce..79246063 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -122,6 +122,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -177,17 +199,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) |