summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/settings.json10
-rw-r--r--CHANGELOG.md3
-rw-r--r--plugins/memory/memoryjobs/consumer.go33
-rw-r--r--tests/plugins/jobs/jobs_memory_test.go5
4 files changed, 36 insertions, 15 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 00000000..ca7c7965
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,10 @@
+{
+ "cSpell.words": [
+ "goridge",
+ "goroutines",
+ "jobsv",
+ "pluggable",
+ "prefetch",
+ "stretchr"
+ ]
+}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a92b5ca0..9d8b8077 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,10 +1,11 @@
# CHANGELOG
-## v2.4.1 (10.09.2021)
+## v2.4.1 (13.09.2021)
## 🩹 Fixes:
- 🐛 Fix: bug with not-idempotent call to the `attributes.Init`.
+- 🐛 Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the pipeline in the configuration.
## v2.4.0 (02.09.2021)
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index fbdedefe..dacc2848 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -118,6 +118,28 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
return nil
}
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ const op = errors.Op("memory_jobs_run")
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ l := atomic.LoadUint32(&c.listeners)
+ // listener already active
+ if l == 1 {
+ c.log.Warn("listener already in the active state")
+ return errors.E(op, errors.Str("listener already in the active state"))
+ }
+
+ c.consume()
+ atomic.StoreUint32(&c.listeners, 1)
+
+ return nil
+}
+
func (c *consumer) Pause(_ context.Context, p string) {
start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -173,17 +195,6 @@ func (c *consumer) Resume(_ context.Context, p string) {
})
}
-// Run is no-op for the ephemeral
-func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
func (c *consumer) Stop(_ context.Context) error {
start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go
index 20cbfb3f..7e39c556 100644
--- a/tests/plugins/jobs/jobs_memory_test.go
+++ b/tests/plugins/jobs/jobs_memory_test.go
@@ -107,7 +107,7 @@ func TestMemoryInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 3)
+ time.Sleep(time.Second * 1)
stopCh <- struct{}{}
wg.Wait()
}
@@ -229,7 +229,7 @@ func TestMemoryPauseResume(t *testing.T) {
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -301,7 +301,6 @@ func TestMemoryPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("Resume", resumePipes("test-local"))
t.Run("Pause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
t.Run("Resume", resumePipes("test-local"))