diff options
author | Wolfy-J <[email protected]> | 2018-06-10 16:44:41 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 16:44:41 +0300 |
commit | 4c292ee46f5505b00b16186e8f30e9bc1be25895 (patch) | |
tree | 818dffc7ce5e890875b147b97e298d4c7c48cbd9 /service/rpc | |
parent | a62237fa5afc310453e709837e363f0bb4d7ecf3 (diff) |
fs config
Diffstat (limited to 'service/rpc')
-rw-r--r-- | service/rpc/config.go | 35 | ||||
-rw-r--r-- | service/rpc/config_test.go | 109 | ||||
-rw-r--r-- | service/rpc/service.go | 122 | ||||
-rw-r--r-- | service/rpc/service_test.go | 95 |
4 files changed, 361 insertions, 0 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go new file mode 100644 index 00000000..8a34752a --- /dev/null +++ b/service/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // Listen string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go new file mode 100644 index 00000000..a953e30e --- /dev/null +++ b/service/rpc/config_test.go @@ -0,0 +1,109 @@ +package rpc + +import ( + "github.com/stretchr/testify/assert" + "runtime" + "testing" +) + +func TestConfig_Listener(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "tcp", ln.Addr().Network()) + assert.Equal(t, "[::]:18001", ln.Addr().String()) +} + +func TestConfig_ListenerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "unix", ln.Addr().Network()) + assert.Equal(t, "rpc.sock", ln.Addr().String()) +} + +func Test_Config_Error(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_ErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func TestConfig_Dialer(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "tcp", conn.RemoteAddr().Network()) + assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) +} + +func TestConfig_DialerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "unix", conn.RemoteAddr().Network()) + assert.Equal(t, "rpc.sock", conn.RemoteAddr().String()) +} + +func Test_Config_DialerError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_DialerErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) +} diff --git a/service/rpc/service.go b/service/rpc/service.go new file mode 100644 index 00000000..ce1e3351 --- /dev/null +++ b/service/rpc/service.go @@ -0,0 +1,122 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" + "sync" +) + +// Name contains default service name. +const Name = "rpc" + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server + + mu sync.Mutex + serving bool +} + +// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + s.cfg = config + s.rpc = rpc.NewServer() + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.mu.Lock() + s.serving = true + s.stop = make(chan interface{}) + s.mu.Unlock() + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + go func() { + for { + select { + case <-s.stop: + break + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + <-s.stop + + s.mu.Lock() + s.serving = false + s.mu.Unlock() + + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.serving { + close(s.stop) + } +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go new file mode 100644 index 00000000..a57ce1bd --- /dev/null +++ b/service/rpc/service_test.go @@ -0,0 +1,95 @@ +package rpc + +import ( + "encoding/json" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type testService struct{} + +func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) service.Config { return nil } +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_ConfigError(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":false`}, nil) + + assert.Error(t, err) + assert.False(t, ok) +} + +func Test_Disabled(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil) + + assert.NoError(t, err) + assert.False(t, ok) +} + +func Test_RegisterNotConfigured(t *testing.T) { + s := &Service{} + assert.Error(t, s.Register("test", &testService{})) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) + assert.Error(t, s.Serve()) +} + +func Test_Enabled(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) +} + +func Test_StopNonServing(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) + s.Stop() +} + +func Test_Serve_Errors(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + assert.Error(t, s.Serve()) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) +} + +func Test_Serve_Client(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + defer s.Stop() + + assert.NoError(t, s.Register("test", &testService{})) + + go func() { assert.NoError(t, s.Serve()) }() + + client, err := s.Client() + assert.NotNil(t, client) + assert.NoError(t, err) + defer client.Close() + + var resp string + assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) + assert.Equal(t, "hello world", resp) +} |