summaryrefslogtreecommitdiff
path: root/service/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'service/rpc')
-rw-r--r--service/rpc/config.go38
-rw-r--r--service/rpc/config_test.go38
-rw-r--r--service/rpc/service.go30
-rw-r--r--service/rpc/service_test.go25
4 files changed, 67 insertions, 64 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go
index e3168945..0485fdf6 100644
--- a/service/rpc/config.go
+++ b/service/rpc/config.go
@@ -5,9 +5,11 @@ import (
"net"
"strings"
"syscall"
+ "github.com/spiral/roadrunner/service"
)
-type config struct {
+// Config defines RPC service config.
+type Config struct {
// Indicates if RPC connection is enabled.
Enable bool
@@ -15,9 +17,31 @@ type config struct {
Listen string
}
-// listener creates new rpc socket listener.
-func (cfg *config) listener() (net.Listener, error) {
- dsn := strings.Split(cfg.Listen, "://")
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ return c.Valid()
+}
+
+// Valid returns nil if config is valid.
+func (c *Config) Valid() error {
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return nil
+}
+
+// Listener creates new rpc socket Listener.
+func (c *Config) Listener() (net.Listener, error) {
+ dsn := strings.Split(c.Listen, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
}
@@ -29,9 +53,9 @@ func (cfg *config) listener() (net.Listener, error) {
return net.Listen(dsn[0], dsn[1])
}
-// dialer creates rpc socket dialer.
-func (cfg *config) dialer() (net.Conn, error) {
- dsn := strings.Split(cfg.Listen, "://")
+// Dialer creates rpc socket Dialer.
+func (c *Config) Dialer() (net.Conn, error) {
+ dsn := strings.Split(c.Listen, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
index a953e30e..87a89a2b 100644
--- a/service/rpc/config_test.go
+++ b/service/rpc/config_test.go
@@ -6,10 +6,12 @@ import (
"testing"
)
+// todo: test hydrate
+
func TestConfig_Listener(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
+ cfg := &Config{Listen: "tcp://:18001"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
defer ln.Close()
@@ -23,9 +25,9 @@ func TestConfig_ListenerUnix(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "unix://rpc.sock"}
+ cfg := &Config{Listen: "unix://rpc.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
defer ln.Close()
@@ -39,28 +41,28 @@ func Test_Config_Error(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.listener()
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Listener()
assert.Nil(t, ln)
assert.Error(t, err)
assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
}
func Test_Config_ErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
+ cfg := &Config{Listen: "xinu://unix.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.Nil(t, ln)
assert.Error(t, err)
}
func TestConfig_Dialer(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
+ cfg := &Config{Listen: "tcp://:18001"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
defer ln.Close()
- conn, err := cfg.dialer()
+ conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()
@@ -74,12 +76,12 @@ func TestConfig_DialerUnix(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "unix://rpc.sock"}
+ cfg := &Config{Listen: "unix://rpc.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
defer ln.Close()
- conn, err := cfg.dialer()
+ conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()
@@ -93,17 +95,17 @@ func Test_Config_DialerError(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.dialer()
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Dialer()
assert.Nil(t, ln)
assert.Error(t, err)
assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
}
func Test_Config_DialerErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
+ cfg := &Config{Listen: "xinu://unix.sock"}
- ln, err := cfg.dialer()
+ ln, err := cfg.Dialer()
assert.Nil(t, ln)
assert.Error(t, err)
}
diff --git a/service/rpc/service.go b/service/rpc/service.go
index 82f26407..621348e8 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -3,7 +3,6 @@ package rpc
import (
"errors"
"github.com/spiral/goridge"
- "github.com/spiral/roadrunner/service"
"net/rpc"
"sync"
)
@@ -13,27 +12,20 @@ const ID = "rpc"
// Service is RPC service.
type Service struct {
- cfg *config
- stop chan interface{}
- rpc *rpc.Server
-
+ cfg *Config
+ stop chan interface{}
+ rpc *rpc.Server
mu sync.Mutex
serving bool
}
-// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
-// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) {
- config := &config{}
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
+// Init rpc service. Must return true if service is enabled.
+func (s *Service) Init(cfg *Config) (enabled bool, err error) {
+ if !cfg.Enable {
return false, nil
}
- s.cfg = config
+ s.cfg = cfg
s.rpc = rpc.NewServer()
return true, nil
@@ -50,7 +42,7 @@ func (s *Service) Serve() error {
s.stop = make(chan interface{})
s.mu.Unlock()
- ln, err := s.cfg.listener()
+ ln, err := s.cfg.Listener()
if err != nil {
return err
}
@@ -99,12 +91,12 @@ func (s *Service) Stop() {
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
-func (s *Service) Register(name string, rcvr interface{}) error {
+func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
return errors.New("RPC service is not configured")
}
- return s.rpc.RegisterName(name, rcvr)
+ return s.rpc.RegisterName(name, svc)
}
// Client creates new RPC client.
@@ -113,7 +105,7 @@ func (s *Service) Client() (*rpc.Client, error) {
return nil, errors.New("RPC service is not configured")
}
- conn, err := s.cfg.dialer()
+ conn, err := s.cfg.Dialer()
if err != nil {
return nil, err
}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
index d4734bb5..fc88d38d 100644
--- a/service/rpc/service_test.go
+++ b/service/rpc/service_test.go
@@ -1,8 +1,6 @@
package rpc
import (
- "encoding/json"
- "github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
"testing"
"time"
@@ -12,22 +10,9 @@ type testService struct{}
func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
-type testCfg struct{ cfg string }
-
-func (cfg *testCfg) Get(name string) service.Config { return nil }
-func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_ConfigError(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":false`}, nil)
-
- assert.Error(t, err)
- assert.False(t, ok)
-}
-
func Test_Disabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":false}`}, nil)
+ ok, err := s.Init(&Config{Enable: false})
assert.NoError(t, err)
assert.False(t, ok)
@@ -45,7 +30,7 @@ func Test_RegisterNotConfigured(t *testing.T) {
func Test_Enabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -53,7 +38,7 @@ func Test_Enabled(t *testing.T) {
func Test_StopNonServing(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -62,7 +47,7 @@ func Test_StopNonServing(t *testing.T) {
func Test_Serve_Errors(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -75,7 +60,7 @@ func Test_Serve_Errors(t *testing.T) {
func Test_Serve_Client(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"})
assert.NoError(t, err)
assert.True(t, ok)