diff options
-rw-r--r-- | .vscode/settings.json | 10 | ||||
-rw-r--r-- | CHANGELOG.md | 3 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 33 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_memory_test.go | 5 |
4 files changed, 36 insertions, 15 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..ca7c7965 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + "cSpell.words": [ + "goridge", + "goroutines", + "jobsv", + "pluggable", + "prefetch", + "stretchr" + ] +} diff --git a/CHANGELOG.md b/CHANGELOG.md index a92b5ca0..9d8b8077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ # CHANGELOG -## v2.4.1 (10.09.2021) +## v2.4.1 (13.09.2021) ## 🩹 Fixes: - 🐛 Fix: bug with not-idempotent call to the `attributes.Init`. +- 🐛 Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the pipeline in the configuration. ## v2.4.0 (02.09.2021) diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..dacc2848 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go index 20cbfb3f..7e39c556 100644 --- a/tests/plugins/jobs/jobs_memory_test.go +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -107,7 +107,7 @@ func TestMemoryInit(t *testing.T) { } }() - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 1) stopCh <- struct{}{} wg.Wait() } @@ -229,7 +229,7 @@ func TestMemoryPauseResume(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -301,7 +301,6 @@ func TestMemoryPauseResume(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("Resume", resumePipes("test-local")) t.Run("Pause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) t.Run("Resume", resumePipes("test-local")) |