summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/service_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/service_test.go')
-rw-r--r--plugins/jobs/oooold/service_test.go458
1 files changed, 0 insertions, 458 deletions
diff --git a/plugins/jobs/oooold/service_test.go b/plugins/jobs/oooold/service_test.go
deleted file mode 100644
index a8e0e56d..00000000
--- a/plugins/jobs/oooold/service_test.go
+++ /dev/null
@@ -1,458 +0,0 @@
-package oooold
-
-import (
- "bytes"
- "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- "github.com/stretchr/testify/assert"
- "io/ioutil"
- "syscall"
- "testing"
-)
-
-func viperConfig(cfg string) service.Config {
- v := viper.New()
- v.SetConfigType("json")
-
- err := v.ReadConfig(bytes.NewBuffer([]byte(cfg)))
- if err != nil {
- panic(err)
- }
-
- return &configWrapper{v}
-}
-
-// configWrapper provides interface bridge between v configs and service.Config.
-type configWrapper struct {
- v *viper.Viper
-}
-
-// Get nested config section (sub-map), returns nil if section not found.
-func (w *configWrapper) Get(key string) service.Config {
- sub := w.v.Sub(key)
- if sub == nil {
- return nil
- }
-
- return &configWrapper{sub}
-}
-
-// Unmarshal unmarshal config data into given struct.
-func (w *configWrapper) Unmarshal(out interface{}) error {
- return w.v.Unmarshal(out)
-}
-
-func jobs(container service.Container) *Service {
- svc, _ := container.Get("jobs")
- return svc.(*Service)
-}
-
-func TestService_Init(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-}
-
-func TestService_ServeStop(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("env", &env.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- <-ready
- c.Stop()
-}
-
-func TestService_ServeError(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/bad-consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- assert.Error(t, c.Serve())
-}
-
-func TestService_GetPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- assert.Equal(t, "ephemeral", jobs(c).cfg.pipelines.Get("default").Broker())
-}
-
-func TestService_StatPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
- pipe := svc.cfg.pipelines.Get("default")
-
- stat, err := svc.Stat(pipe)
- assert.NoError(t, err)
-
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, true, stat.Consuming)
-}
-
-func TestService_StatNonConsumingPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
- pipe := svc.cfg.pipelines.Get("default")
-
- stat, err := svc.Stat(pipe)
- assert.NoError(t, err)
-
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, false, stat.Consuming)
-}
-
-func TestService_DoJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventJobOK {
- close(jobReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- id, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.job",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.NoError(t, err)
-
- <-jobReady
-
- data, err := ioutil.ReadFile("tests/local.job")
- assert.NoError(t, err)
- defer syscall.Unlink("tests/local.job")
-
- assert.Contains(t, string(data), id)
-}
-
-func TestService_DoUndefinedJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.undefined",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.Error(t, err)
-}
-
-func TestService_DoJobIntoInvalidBroker(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.job",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.Error(t, err)
-}
-
-func TestService_DoStatInvalidBroker(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Stat(svc.cfg.pipelines.Get("default"))
- assert.Error(t, err)
-}
-
-func TestService_DoErrorJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobReady := make(chan interface{})
-
- var jobErr error
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventJobError {
- jobErr = ctx.(error)
- close(jobReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.errorJob",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.NoError(t, err)
-
- <-jobReady
- assert.Error(t, jobErr)
- assert.Contains(t, jobErr.Error(), "something is wrong")
-}