summaryrefslogtreecommitdiff
path: root/plugins/memory/memoryjobs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-12 22:10:49 +0300
committerGitHub <[email protected]>2021-09-12 22:10:49 +0300
commitfe72d84395970281fe330a2b9423f8c0e4c3b9c8 (patch)
treee797cc898b0a6ae6bb4ac44406e4d765e7db9a83 /plugins/memory/memoryjobs/consumer.go
parentfe8ae870ff2bf81a04868d3e42f319d54453a6b1 (diff)
parent417857453bea305eeafd11d4acf300fcb9719cf1 (diff)
[#795]: fix(jobs, memory): start `jobs` `in-memory` driver automatically if the user consumes it
[#795]: fix(jobs, memory): start `jobs` `in-memory` driver automatically if the user consumes it
Diffstat (limited to 'plugins/memory/memoryjobs/consumer.go')
-rw-r--r--plugins/memory/memoryjobs/consumer.go33
1 files changed, 22 insertions, 11 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index fbdedefe..dacc2848 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
return nil
}
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ const op = errors.Op("memory_jobs_run")
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ l := atomic.LoadUint32(&c.listeners)
+ // listener already active
+ if l == 1 {
+ c.log.Warn("listener already in the active state")
+ return errors.E(op, errors.Str("listener already in the active state"))
+ }
+
+ c.consume()
+ atomic.StoreUint32(&c.listeners, 1)
+
+ return nil
+}
+
func (c *consumer) Pause(_ context.Context, p string) {
start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) {
})
}
-// Run is no-op for the ephemeral
-func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
func (c *consumer) Stop(_ context.Context) error {
start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)