diff options
author | Valery Piashchynski <[email protected]> | 2021-06-21 17:01:39 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-21 17:01:39 +0300 |
commit | 41bb9fa5938125217a075c60f1e39dc3a9a27537 (patch) | |
tree | ce2997caa62f90279d85f6aa2397996f80791893 /plugins/jobs/oooold | |
parent | bdcfdd28d705e401973da2beb8a11543e362bda4 (diff) |
- Rework dispatcher, pipeline, job (not completely)
Create a config sample with RR2 support. Progress on root JOBS plugin.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/config.go | 39 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/config_test.go | 27 | ||||
-rw-r--r-- | plugins/jobs/oooold/rpc.go | 1 | ||||
-rw-r--r-- | plugins/jobs/oooold/service.go | 34 |
4 files changed, 17 insertions, 84 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/oooold/broker/amqp/config.go deleted file mode 100644 index 0ed3a50e..00000000 --- a/plugins/jobs/oooold/broker/amqp/config.go +++ /dev/null @@ -1,39 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "time" -) - -// Config defines sqs broker configuration. -type Config struct { - // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). - Addr string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Addr == "" { - return fmt.Errorf("AMQP address is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to redial -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go deleted file mode 100644 index 1abbb55d..00000000 --- a/plugins/jobs/oooold/broker/amqp/config_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package amqp - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"addr":""}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go index 42730a68..cc61fb7d 100644 --- a/plugins/jobs/oooold/rpc.go +++ b/plugins/jobs/oooold/rpc.go @@ -2,7 +2,6 @@ package oooold import ( "fmt" - "github.com/spiral/roadrunner/util" ) type rpcServer struct{ svc *Service } diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go index 4244ed1a..7cfcff31 100644 --- a/plugins/jobs/oooold/service.go +++ b/plugins/jobs/oooold/service.go @@ -91,27 +91,27 @@ func (svc *Service) Init( } // run all brokers in nested container - svc.brokers = service.NewContainer(log) - for name, b := range svc.Brokers { - svc.brokers.Register(name, b) - if ep, ok := b.(EventProvider); ok { - ep.Listen(svc.throw) - } - } + //svc.brokers = service.NewContainer(log) + //for name, b := range svc.Brokers { + // svc.brokers.Register(name, b) + // if ep, ok := b.(EventProvider); ok { + // ep.Listen(svc.throw) + // } + //} // init all broker configs - if err := svc.brokers.Init(svc.cfg); err != nil { - return false, err - } + //if err := svc.brokers.Init(svc.cfg); err != nil { + // return false, err + //} // register all pipelines (per broker) - for name, b := range svc.Brokers { - for _, pipe := range svc.cfg.pipelines.Broker(name) { - if err := b.Register(pipe); err != nil { - return false, err - } - } - } + //for name, b := range svc.Brokers { + // for _, pipe := range svc.cfg.pipelines.Broker(name) { + // if err := b.Register(pipe); err != nil { + // return false, err + // } + // } + //} return true, nil } |