diff options
author | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
commit | cee4bc46097506d6e892b6af194751434700621a (patch) | |
tree | e542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/rpc_test.go | |
parent | d4c92e48bada7593b6fbec612a742c599de6e736 (diff) |
- Update jobs sources
- Update Arch diagramm
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/rpc_test.go')
-rw-r--r-- | plugins/jobs/oooold/rpc_test.go | 657 |
1 files changed, 657 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/rpc_test.go b/plugins/jobs/oooold/rpc_test.go new file mode 100644 index 00000000..a63b9ea2 --- /dev/null +++ b/plugins/jobs/oooold/rpc_test.go @@ -0,0 +1,657 @@ +package oooold + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" + "io/ioutil" + "syscall" + "testing" +) + +func TestRPC_StatPipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_StatNonActivePipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_StatPipelineWithUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.Error(t, cl.Call("jobs.Stat", true, &list)) +} + +func TestRPC_EnableConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeActive { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.Resume", "default", nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_EnableConsumingUndefined(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.Resume", "undefined", &ok)) +} + +func TestRPC_EnableConsumingUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.Resume", "default", &ok)) +} + +func TestRPC_EnableConsumingAllUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.ResumeAll", true, &ok)) +} + +func TestRPC_DisableConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeStopped { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.Stop", "default", nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_DisableConsumingUndefined(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + ok := "" + assert.Error(t, cl.Call("jobs.Stop", "undefined", &ok)) +} + +func TestRPC_EnableAllConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeActive { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.ResumeAll", true, nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_DisableAllConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeStopped { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.StopAll", true, nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_DoJob(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventJobOK { + close(jobReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + id := "" + assert.NoError(t, cl.Call("jobs.Push", &Job{ + Job: "spiral.jobs.tests.local.job", + Payload: `{"data":100}`, + Options: &Options{}, + }, &id)) + assert.NoError(t, err) + + <-jobReady + + data, err := ioutil.ReadFile("tests/local.job") + assert.NoError(t, err) + defer syscall.Unlink("tests/local.job") + + assert.Contains(t, string(data), id) +} + +func TestRPC_NoOperationOnDeadServer(t *testing.T) { + rc := &rpcServer{nil} + + assert.Error(t, rc.Push(&Job{}, nil)) + assert.Error(t, rc.Reset(true, nil)) + + assert.Error(t, rc.Stop("default", nil)) + assert.Error(t, rc.StopAll(true, nil)) + + assert.Error(t, rc.Resume("default", nil)) + assert.Error(t, rc.ResumeAll(true, nil)) + + assert.Error(t, rc.Workers(true, nil)) + assert.Error(t, rc.Stat(true, nil)) +} + +func TestRPC_Workers(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &WorkerList{} + assert.NoError(t, cl.Call("jobs.Workers", true, &list)) + + assert.Len(t, list.Workers, 1) + + pid := list.Workers[0].Pid + assert.NotEqual(t, 0, pid) + + // reset + ok := "" + assert.NoError(t, cl.Call("jobs.Reset", true, &ok)) + + list = &WorkerList{} + assert.NoError(t, cl.Call("jobs.Workers", true, &list)) + + assert.Len(t, list.Workers, 1) + + assert.NotEqual(t, list.Workers[0].Pid, pid) +} |