diff options
author | Valery Piashchynski <[email protected]> | 2021-09-12 21:33:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-12 22:13:25 +0300 |
commit | 7e769b06670545b75ff9635e88a06bb5d9f26ac5 (patch) | |
tree | c055b8fef1e33c5183e499da5f04995cf45eeca5 | |
parent | 1b4b286ddb59d5d7d8784ef6f01c0641ec453eb2 (diff) |
change jobs-memory run behavior
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | .vscode/settings.json | 8 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 33 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_memory_test.go | 5 |
3 files changed, 32 insertions, 14 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..98cf97cd --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "cSpell.words": [ + "goridge", + "jobsv", + "prefetch", + "stretchr" + ] +} diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..dacc2848 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro return nil } +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + const op = errors.Op("memory_jobs_run") + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return errors.E(op, errors.Str("listener already in the active state")) + } + + c.consume() + atomic.StoreUint32(&c.listeners, 1) + + return nil +} + func (c *consumer) Pause(_ context.Context, p string) { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) { }) } -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - func (c *consumer) Stop(_ context.Context) error { start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go index 20cbfb3f..7e39c556 100644 --- a/tests/plugins/jobs/jobs_memory_test.go +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -107,7 +107,7 @@ func TestMemoryInit(t *testing.T) { } }() - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 1) stopCh <- struct{}{} wg.Wait() } @@ -229,7 +229,7 @@ func TestMemoryPauseResume(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -301,7 +301,6 @@ func TestMemoryPauseResume(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("Resume", resumePipes("test-local")) t.Run("Pause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) t.Run("Resume", resumePipes("test-local")) |