diff options
author | Valery Piashchynski <[email protected]> | 2021-08-29 23:46:11 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-29 23:46:11 +0300 |
commit | c23a88a943b53b99d112b63ed121931d1f79436f (patch) | |
tree | 5373bb61fec4ceb5db041f7207cec7ef115388d1 /tests/plugins | |
parent | 22e17a99fe2087f9c11a438e877afbac0096c052 (diff) |
Implement Init, FromPipeline methods
Update receiver in the amqp driver
Add simple (initial) boltdb tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins')
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml | 43 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-no-global.yaml | 47 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 504 |
5 files changed, 642 insertions, 0 deletions
diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml new file mode 100644 index 00000000..1d558a63 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0666 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml new file mode 100644 index 00000000..03a8d67e --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0666 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 2 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml new file mode 100644 index 00000000..79493d96 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml new file mode 100644 index 00000000..1b01eb73 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -0,0 +1,47 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: amqp + priority: 1 + pipeline_size: 100 + queue: test-1-queue + exchange: default + exchange_type: direct + routing_key: test + + test-2: + driver: amqp + priority: 2 + pipeline_size: 100 + queue: test-2-queue + exchange: default + exchange_type: direct + routing_key: test-2 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go new file mode 100644 index 00000000..cf3e5a91 --- /dev/null +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -0,0 +1,504 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/boltdb" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/stretchr/testify/assert" +) + +const ( + rr1db string = "rr1.db" + rr2db string = "rr2.db" +) + +func TestBoltDBInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-init.yaml", + Prefix: "rr", + } + // + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + // + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() + + assert.NoError(t, os.Remove(rr1db)) + assert.NoError(t, os.Remove(rr2db)) +} + +func TestBoltDBDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +// +//func TestAMQPJobsError(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-amqp-jobs-err.yaml", +// Prefix: "rr", +// } +// +// controller := gomock.NewController(t) +// mockLogger := mocks.NewMockLogger(controller) +// +// // general +// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() +// +// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// +// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) +// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// mockLogger, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// ch, err := cont.Serve() +// if err != nil { +// t.Fatal(err) +// } +// +// sig := make(chan os.Signal, 1) +// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// +// stopCh := make(chan struct{}, 1) +// +// go func() { +// defer wg.Done() +// for { +// select { +// case e := <-ch: +// assert.Fail(t, "error", e.Error.Error()) +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// case <-sig: +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// case <-stopCh: +// // timeout +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// } +// } +// }() +// +// time.Sleep(time.Second * 3) +// +// t.Run("DeclareAMQPPipeline", declareAMQPPipe) +// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// time.Sleep(time.Second * 25) +// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) +// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) +// +// time.Sleep(time.Second * 5) +// stopCh <- struct{}{} +// wg.Wait() +//} +// +//func TestAMQPNoGlobalSection(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-no-global.yaml", +// Prefix: "rr", +// } +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// &logger.ZapLogger{}, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// _, err = cont.Serve() +// require.Error(t, err) +//} +// +//func TestAMQPStats(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-amqp-declare.yaml", +// Prefix: "rr", +// } +// +// controller := gomock.NewController(t) +// mockLogger := mocks.NewMockLogger(controller) +// +// // general +// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() +// +// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) +// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// mockLogger, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// ch, err := cont.Serve() +// if err != nil { +// t.Fatal(err) +// } +// +// sig := make(chan os.Signal, 1) +// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// +// stopCh := make(chan struct{}, 1) +// +// go func() { +// defer wg.Done() +// for { +// select { +// case e := <-ch: +// assert.Fail(t, "error", e.Error.Error()) +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// case <-sig: +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// case <-stopCh: +// // timeout +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// } +// } +// }() +// +// time.Sleep(time.Second * 3) +// +// t.Run("DeclareAMQPPipeline", declareAMQPPipe) +// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// time.Sleep(time.Second * 2) +// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) +// time.Sleep(time.Second * 2) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) +// +// out := &jobState.State{} +// t.Run("Stats", stats(out)) +// +// assert.Equal(t, out.Pipeline, "test-3") +// assert.Equal(t, out.Driver, "amqp") +// assert.Equal(t, out.Queue, "default") +// +// assert.Equal(t, int64(1), out.Active) +// assert.Equal(t, int64(1), out.Delayed) +// assert.Equal(t, int64(0), out.Reserved) +// assert.Equal(t, false, out.Ready) +// +// time.Sleep(time.Second) +// t.Run("ResumePipeline", resumePipes("test-3")) +// time.Sleep(time.Second * 7) +// +// out = &jobState.State{} +// t.Run("Stats", stats(out)) +// +// assert.Equal(t, out.Pipeline, "test-3") +// assert.Equal(t, out.Driver, "amqp") +// assert.Equal(t, out.Queue, "default") +// +// assert.Equal(t, int64(0), out.Active) +// assert.Equal(t, int64(0), out.Delayed) +// assert.Equal(t, int64(0), out.Reserved) +// assert.Equal(t, true, out.Ready) +// +// time.Sleep(time.Second) +// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) +// +// time.Sleep(time.Second * 5) +// stopCh <- struct{}{} +// wg.Wait() +//} +// + +func declareBoltDBPipe(file string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "boltdb", + "name": "test-3", + "prefetch": "100", + "priority": "3", + "file": file, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) + } +} |