diff options
Diffstat (limited to 'service/rpc')
-rw-r--r-- | service/rpc/config.go | 38 | ||||
-rw-r--r-- | service/rpc/config_test.go | 38 | ||||
-rw-r--r-- | service/rpc/service.go | 30 | ||||
-rw-r--r-- | service/rpc/service_test.go | 25 |
4 files changed, 67 insertions, 64 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go index e3168945..0485fdf6 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -5,9 +5,11 @@ import ( "net" "strings" "syscall" + "github.com/spiral/roadrunner/service" ) -type config struct { +// Config defines RPC service config. +type Config struct { // Indicates if RPC connection is enabled. Enable bool @@ -15,9 +17,31 @@ type config struct { Listen string } -// listener creates new rpc socket listener. -func (cfg *config) listener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + return c.Valid() +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } @@ -29,9 +53,9 @@ func (cfg *config) listener() (net.Listener, error) { return net.Listen(dsn[0], dsn[1]) } -// dialer creates rpc socket dialer. -func (cfg *config) dialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index a953e30e..87a89a2b 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -6,10 +6,12 @@ import ( "testing" ) +// todo: test hydrate + func TestConfig_Listener(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -23,9 +25,9 @@ func TestConfig_ListenerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -39,28 +41,28 @@ func Test_Config_Error(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.listener() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_ErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) } func TestConfig_Dialer(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -74,12 +76,12 @@ func TestConfig_DialerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -93,17 +95,17 @@ func Test_Config_DialerError(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.dialer() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.dialer() + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) } diff --git a/service/rpc/service.go b/service/rpc/service.go index 82f26407..621348e8 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -3,7 +3,6 @@ package rpc import ( "errors" "github.com/spiral/goridge" - "github.com/spiral/roadrunner/service" "net/rpc" "sync" ) @@ -13,27 +12,20 @@ const ID = "rpc" // Service is RPC service. type Service struct { - cfg *config - stop chan interface{} - rpc *rpc.Server - + cfg *Config + stop chan interface{} + rpc *rpc.Server mu sync.Mutex serving bool } -// Init must return configure service and return true if service hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) { - config := &config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg *Config) (enabled bool, err error) { + if !cfg.Enable { return false, nil } - s.cfg = config + s.cfg = cfg s.rpc = rpc.NewServer() return true, nil @@ -50,7 +42,7 @@ func (s *Service) Serve() error { s.stop = make(chan interface{}) s.mu.Unlock() - ln, err := s.cfg.listener() + ln, err := s.cfg.Listener() if err != nil { return err } @@ -99,12 +91,12 @@ func (s *Service) Stop() { // - one return value, of type error // It returns an error if the receiver is not an exported type or has // no suitable methods. It also logs the error using package log. -func (s *Service) Register(name string, rcvr interface{}) error { +func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { return errors.New("RPC service is not configured") } - return s.rpc.RegisterName(name, rcvr) + return s.rpc.RegisterName(name, svc) } // Client creates new RPC client. @@ -113,7 +105,7 @@ func (s *Service) Client() (*rpc.Client, error) { return nil, errors.New("RPC service is not configured") } - conn, err := s.cfg.dialer() + conn, err := s.cfg.Dialer() if err != nil { return nil, err } diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go index d4734bb5..fc88d38d 100644 --- a/service/rpc/service_test.go +++ b/service/rpc/service_test.go @@ -1,8 +1,6 @@ package rpc import ( - "encoding/json" - "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "testing" "time" @@ -12,22 +10,9 @@ type testService struct{} func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } -type testCfg struct{ cfg string } - -func (cfg *testCfg) Get(name string) service.Config { return nil } -func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_ConfigError(t *testing.T) { - s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false`}, nil) - - assert.Error(t, err) - assert.False(t, ok) -} - func Test_Disabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false}`}, nil) + ok, err := s.Init(&Config{Enable: false}) assert.NoError(t, err) assert.False(t, ok) @@ -45,7 +30,7 @@ func Test_RegisterNotConfigured(t *testing.T) { func Test_Enabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -53,7 +38,7 @@ func Test_Enabled(t *testing.T) { func Test_StopNonServing(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -62,7 +47,7 @@ func Test_StopNonServing(t *testing.T) { func Test_Serve_Errors(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}) assert.NoError(t, err) assert.True(t, ok) @@ -75,7 +60,7 @@ func Test_Serve_Errors(t *testing.T) { func Test_Serve_Client(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}) assert.NoError(t, err) assert.True(t, ok) |