diff options
author | Valery Piashchynski <[email protected]> | 2021-09-12 22:10:49 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-12 22:10:49 +0300 |
commit | fe72d84395970281fe330a2b9423f8c0e4c3b9c8 (patch) | |
tree | e797cc898b0a6ae6bb4ac44406e4d765e7db9a83 /plugins/memory/memoryjobs/consumer.go | |
parent | fe8ae870ff2bf81a04868d3e42f319d54453a6b1 (diff) | |
parent | 417857453bea305eeafd11d4acf300fcb9719cf1 (diff) |
[#795]: fix(jobs, memory): start `jobs` `in-memory` driver automatically if the user consumes it
[#795]: fix(jobs, memory): start `jobs` `in-memory` driver automatically if the user consumes it
Diffstat (limited to 'plugins/memory/memoryjobs/consumer.go')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..dacc2848 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) |