summaryrefslogtreecommitdiff
path: root/service/rpc
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-10 16:44:41 +0300
committerWolfy-J <[email protected]>2018-06-10 16:44:41 +0300
commit4c292ee46f5505b00b16186e8f30e9bc1be25895 (patch)
tree818dffc7ce5e890875b147b97e298d4c7c48cbd9 /service/rpc
parenta62237fa5afc310453e709837e363f0bb4d7ecf3 (diff)
fs config
Diffstat (limited to 'service/rpc')
-rw-r--r--service/rpc/config.go35
-rw-r--r--service/rpc/config_test.go109
-rw-r--r--service/rpc/service.go122
-rw-r--r--service/rpc/service_test.go95
4 files changed, 361 insertions, 0 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go
new file mode 100644
index 00000000..8a34752a
--- /dev/null
+++ b/service/rpc/config.go
@@ -0,0 +1,35 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+)
+
+type config struct {
+ // Indicates if RPC connection is enabled.
+ Enable bool
+
+ // Listen string
+ Listen string
+}
+
+// listener creates new rpc socket listener.
+func (cfg *config) listener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+// dialer creates rpc socket dialer.
+func (cfg *config) dialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
new file mode 100644
index 00000000..a953e30e
--- /dev/null
+++ b/service/rpc/config_test.go
@@ -0,0 +1,109 @@
+package rpc
+
+import (
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+)
+
+func TestConfig_Listener(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "tcp", ln.Addr().Network())
+ assert.Equal(t, "[::]:18001", ln.Addr().String())
+}
+
+func TestConfig_ListenerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "unix", ln.Addr().Network())
+ assert.Equal(t, "rpc.sock", ln.Addr().String())
+}
+
+func Test_Config_Error(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_ErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func TestConfig_Dialer(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "tcp", conn.RemoteAddr().Network())
+ assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
+}
+
+func TestConfig_DialerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "unix", conn.RemoteAddr().Network())
+ assert.Equal(t, "rpc.sock", conn.RemoteAddr().String())
+}
+
+func Test_Config_DialerError(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_DialerErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
diff --git a/service/rpc/service.go b/service/rpc/service.go
new file mode 100644
index 00000000..ce1e3351
--- /dev/null
+++ b/service/rpc/service.go
@@ -0,0 +1,122 @@
+package rpc
+
+import (
+ "errors"
+ "github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
+ "net/rpc"
+ "sync"
+)
+
+// Name contains default service name.
+const Name = "rpc"
+
+// Service is RPC service.
+type Service struct {
+ cfg *config
+ stop chan interface{}
+ rpc *rpc.Server
+
+ mu sync.Mutex
+ serving bool
+}
+
+// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) {
+ config := &config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ s.cfg = config
+ s.rpc = rpc.NewServer()
+
+ return true, nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ s.mu.Lock()
+ s.serving = true
+ s.stop = make(chan interface{})
+ s.mu.Unlock()
+
+ ln, err := s.cfg.listener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ go func() {
+ for {
+ select {
+ case <-s.stop:
+ break
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ <-s.stop
+
+ s.mu.Lock()
+ s.serving = false
+ s.mu.Unlock()
+
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.serving {
+ close(s.stop)
+ }
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, rcvr interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, rcvr)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.cfg == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.cfg.dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
new file mode 100644
index 00000000..a57ce1bd
--- /dev/null
+++ b/service/rpc/service_test.go
@@ -0,0 +1,95 @@
+package rpc
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type testService struct{}
+
+func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) service.Config { return nil }
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_ConfigError(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":false`}, nil)
+
+ assert.Error(t, err)
+ assert.False(t, ok)
+}
+
+func Test_Disabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil)
+
+ assert.NoError(t, err)
+ assert.False(t, ok)
+}
+
+func Test_RegisterNotConfigured(t *testing.T) {
+ s := &Service{}
+ assert.Error(t, s.Register("test", &testService{}))
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+ assert.Error(t, s.Serve())
+}
+
+func Test_Enabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+}
+
+func Test_StopNonServing(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+ s.Stop()
+}
+
+func Test_Serve_Errors(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ assert.Error(t, s.Serve())
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+}
+
+func Test_Serve_Client(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ defer s.Stop()
+
+ assert.NoError(t, s.Register("test", &testService{}))
+
+ go func() { assert.NoError(t, s.Serve()) }()
+
+ client, err := s.Client()
+ assert.NotNil(t, client)
+ assert.NoError(t, err)
+ defer client.Close()
+
+ var resp string
+ assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
+ assert.Equal(t, "hello world", resp)
+}