diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 15:31:30 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-31 15:31:30 +0300 |
commit | 83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch) | |
tree | 884dd2991acf12826752632b8321410e7cc923ce /tests/plugins | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) | |
parent | 31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff) |
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'tests/plugins')
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-no-global.yaml | 3 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml | 43 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-no-global.yaml | 41 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 506 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 9 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 16 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_with_toxics_test.go | 6 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 4 |
14 files changed, 664 insertions, 20 deletions
diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml index 87f46069..92d090d4 100644 --- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -29,3 +29,6 @@ jobs: reserve_timeout: 10s consume: [ "test-1" ] + +endure: + log_level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml new file mode 100644 index 00000000..cdc2655f --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml new file mode 100644 index 00000000..804db543 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 2 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml new file mode 100644 index 00000000..d375a9a5 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml new file mode 100644 index 00000000..54aaf3c6 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -0,0 +1,41 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 1 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5067ef9f..6c2d05ca 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -95,7 +95,7 @@ func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ Job: "some/php/namespace", - Id: "1", + Id: "2", Payload: `{"hello":"world"}`, Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, Options: &jobsv1beta.Options{ diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 48d6515d..949698ec 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 8e74c7cc..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go new file mode 100644 index 00000000..ab36ffa4 --- /dev/null +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -0,0 +1,506 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/boltdb" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + rr1db string = "rr1.db" + rr2db string = "rr2.db" +) + +func TestBoltDBInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("boltdb listener stopped").Times(4) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() + + assert.NoError(t, os.Remove(rr1db)) + assert.NoError(t, os.Remove(rr2db)) +} + +func TestBoltDBDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func TestBoltDBStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func declareBoltDBPipe(file string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "boltdb", + "name": "test-3", + "prefetch": "100", + "priority": "3", + "file": file, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) + } +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 98590a96..2890aa9d 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index f0b5697b..951d6227 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -12,11 +12,11 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -171,9 +171,12 @@ func TestJOBSMetrics(t *testing.T) { signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) tt := time.NewTimer(time.Minute * 3) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { defer tt.Stop() + defer wg.Done() for { select { case e := <-ch: @@ -220,7 +223,7 @@ func TestJOBSMetrics(t *testing.T) { assert.Contains(t, genericOut, "workers_memory_bytes") close(sig) - time.Sleep(time.Second * 2) + wg.Wait() } const getAddr = "http://127.0.0.1:2112/metrics" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 630a059a..2dd2c8db 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -17,11 +17,11 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" @@ -437,15 +437,15 @@ func TestSQSStat(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareSQSPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} @@ -474,7 +474,7 @@ func TestSQSStat(t *testing.T) { assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index f6521e8d..80fed8eb 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -11,15 +11,15 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index ced1c5fe..e757a9e6 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,11 +12,11 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached" "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" |