summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-21 17:01:39 +0300
committerValery Piashchynski <[email protected]>2021-06-21 17:01:39 +0300
commit41bb9fa5938125217a075c60f1e39dc3a9a27537 (patch)
treece2997caa62f90279d85f6aa2397996f80791893 /plugins/jobs/oooold
parentbdcfdd28d705e401973da2beb8a11543e362bda4 (diff)
- Rework dispatcher, pipeline, job (not completely)
Create a config sample with RR2 support. Progress on root JOBS plugin. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold')
-rw-r--r--plugins/jobs/oooold/broker/amqp/config.go39
-rw-r--r--plugins/jobs/oooold/broker/amqp/config_test.go27
-rw-r--r--plugins/jobs/oooold/rpc.go1
-rw-r--r--plugins/jobs/oooold/service.go34
4 files changed, 17 insertions, 84 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/oooold/broker/amqp/config.go
deleted file mode 100644
index 0ed3a50e..00000000
--- a/plugins/jobs/oooold/broker/amqp/config.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "time"
-)
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("AMQP address is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to redial
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go
deleted file mode 100644
index 1abbb55d..00000000
--- a/plugins/jobs/oooold/broker/amqp/config_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package amqp
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go
index 42730a68..cc61fb7d 100644
--- a/plugins/jobs/oooold/rpc.go
+++ b/plugins/jobs/oooold/rpc.go
@@ -2,7 +2,6 @@ package oooold
import (
"fmt"
- "github.com/spiral/roadrunner/util"
)
type rpcServer struct{ svc *Service }
diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go
index 4244ed1a..7cfcff31 100644
--- a/plugins/jobs/oooold/service.go
+++ b/plugins/jobs/oooold/service.go
@@ -91,27 +91,27 @@ func (svc *Service) Init(
}
// run all brokers in nested container
- svc.brokers = service.NewContainer(log)
- for name, b := range svc.Brokers {
- svc.brokers.Register(name, b)
- if ep, ok := b.(EventProvider); ok {
- ep.Listen(svc.throw)
- }
- }
+ //svc.brokers = service.NewContainer(log)
+ //for name, b := range svc.Brokers {
+ // svc.brokers.Register(name, b)
+ // if ep, ok := b.(EventProvider); ok {
+ // ep.Listen(svc.throw)
+ // }
+ //}
// init all broker configs
- if err := svc.brokers.Init(svc.cfg); err != nil {
- return false, err
- }
+ //if err := svc.brokers.Init(svc.cfg); err != nil {
+ // return false, err
+ //}
// register all pipelines (per broker)
- for name, b := range svc.Brokers {
- for _, pipe := range svc.cfg.pipelines.Broker(name) {
- if err := b.Register(pipe); err != nil {
- return false, err
- }
- }
- }
+ //for name, b := range svc.Brokers {
+ // for _, pipe := range svc.cfg.pipelines.Broker(name) {
+ // if err := b.Register(pipe); err != nil {
+ // return false, err
+ // }
+ // }
+ //}
return true, nil
}