diff options
author | Wolfy-J <[email protected]> | 2018-06-10 17:06:06 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 17:06:06 +0300 |
commit | 232aa8f3c20a060e556ab431467f4f7b3f83bfbf (patch) | |
tree | a9dacbc142020cabae6a0708733aadb7e789aea5 /service/rpc | |
parent | 3fe85e9d92f5f98337e8f7fd9a14e6b66b9694bd (diff) |
http service
Diffstat (limited to 'service/rpc')
-rw-r--r-- | service/rpc/config.go | 35 | ||||
-rw-r--r-- | service/rpc/config_test.go | 109 | ||||
-rw-r--r-- | service/rpc/service.go | 122 | ||||
-rw-r--r-- | service/rpc/service_test.go | 95 |
4 files changed, 0 insertions, 361 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go deleted file mode 100644 index 8a34752a..00000000 --- a/service/rpc/config.go +++ /dev/null @@ -1,35 +0,0 @@ -package rpc - -import ( - "errors" - "net" - "strings" -) - -type config struct { - // Indicates if RPC connection is enabled. - Enable bool - - // Listen string - Listen string -} - -// listener creates new rpc socket listener. -func (cfg *config) listener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") - } - - return net.Listen(dsn[0], dsn[1]) -} - -// dialer creates rpc socket dialer. -func (cfg *config) dialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") - } - - return net.Dial(dsn[0], dsn[1]) -} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go deleted file mode 100644 index a953e30e..00000000 --- a/service/rpc/config_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package rpc - -import ( - "github.com/stretchr/testify/assert" - "runtime" - "testing" -) - -func TestConfig_Listener(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} - - ln, err := cfg.listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer ln.Close() - - assert.Equal(t, "tcp", ln.Addr().Network()) - assert.Equal(t, "[::]:18001", ln.Addr().String()) -} - -func TestConfig_ListenerUnix(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("not supported on " + runtime.GOOS) - } - - cfg := &config{Listen: "unix://rpc.sock"} - - ln, err := cfg.listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer ln.Close() - - assert.Equal(t, "unix", ln.Addr().Network()) - assert.Equal(t, "rpc.sock", ln.Addr().String()) -} - -func Test_Config_Error(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("not supported on " + runtime.GOOS) - } - - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.listener() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) -} - -func Test_Config_ErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} - - ln, err := cfg.listener() - assert.Nil(t, ln) - assert.Error(t, err) -} - -func TestConfig_Dialer(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} - - ln, err := cfg.listener() - defer ln.Close() - - conn, err := cfg.dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer conn.Close() - - assert.Equal(t, "tcp", conn.RemoteAddr().Network()) - assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) -} - -func TestConfig_DialerUnix(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("not supported on " + runtime.GOOS) - } - - cfg := &config{Listen: "unix://rpc.sock"} - - ln, err := cfg.listener() - defer ln.Close() - - conn, err := cfg.dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer conn.Close() - - assert.Equal(t, "unix", conn.RemoteAddr().Network()) - assert.Equal(t, "rpc.sock", conn.RemoteAddr().String()) -} - -func Test_Config_DialerError(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("not supported on " + runtime.GOOS) - } - - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.dialer() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) -} - -func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} - - ln, err := cfg.dialer() - assert.Nil(t, ln) - assert.Error(t, err) -} diff --git a/service/rpc/service.go b/service/rpc/service.go deleted file mode 100644 index ce1e3351..00000000 --- a/service/rpc/service.go +++ /dev/null @@ -1,122 +0,0 @@ -package rpc - -import ( - "errors" - "github.com/spiral/goridge" - "github.com/spiral/roadrunner/service" - "net/rpc" - "sync" -) - -// Name contains default service name. -const Name = "rpc" - -// Service is RPC service. -type Service struct { - cfg *config - stop chan interface{} - rpc *rpc.Server - - mu sync.Mutex - serving bool -} - -// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) { - config := &config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { - return false, nil - } - - s.cfg = config - s.rpc = rpc.NewServer() - - return true, nil -} - -// Serve serves the service. -func (s *Service) Serve() error { - if s.rpc == nil { - return errors.New("RPC service is not configured") - } - - s.mu.Lock() - s.serving = true - s.stop = make(chan interface{}) - s.mu.Unlock() - - ln, err := s.cfg.listener() - if err != nil { - return err - } - defer ln.Close() - - go func() { - for { - select { - case <-s.stop: - break - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go s.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - }() - - <-s.stop - - s.mu.Lock() - s.serving = false - s.mu.Unlock() - - return nil -} - -// Stop stops the service. -func (s *Service) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.serving { - close(s.stop) - } -} - -// Register publishes in the server the set of methods of the -// receiver value that satisfy the following conditions: -// - exported method of exported type -// - two arguments, both of exported type -// - the second argument is a pointer -// - one return value, of type error -// It returns an error if the receiver is not an exported type or has -// no suitable methods. It also logs the error using package log. -func (s *Service) Register(name string, rcvr interface{}) error { - if s.rpc == nil { - return errors.New("RPC service is not configured") - } - - return s.rpc.RegisterName(name, rcvr) -} - -// Client creates new RPC client. -func (s *Service) Client() (*rpc.Client, error) { - if s.cfg == nil { - return nil, errors.New("RPC service is not configured") - } - - conn, err := s.cfg.dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go deleted file mode 100644 index a57ce1bd..00000000 --- a/service/rpc/service_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package rpc - -import ( - "encoding/json" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type testService struct{} - -func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } - -type testCfg struct{ cfg string } - -func (cfg *testCfg) Get(name string) service.Config { return nil } -func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_ConfigError(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":false`}, nil) - - assert.Error(t, err) - assert.False(t, ok) -} - -func Test_Disabled(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil) - - assert.NoError(t, err) - assert.False(t, ok) -} - -func Test_RegisterNotConfigured(t *testing.T) { - s := &Service{} - assert.Error(t, s.Register("test", &testService{})) - - client, err := s.Client() - assert.Nil(t, client) - assert.Error(t, err) - assert.Error(t, s.Serve()) -} - -func Test_Enabled(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) - - assert.NoError(t, err) - assert.True(t, ok) -} - -func Test_StopNonServing(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) - - assert.NoError(t, err) - assert.True(t, ok) - s.Stop() -} - -func Test_Serve_Errors(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) - assert.NoError(t, err) - assert.True(t, ok) - - assert.Error(t, s.Serve()) - - client, err := s.Client() - assert.Nil(t, client) - assert.Error(t, err) -} - -func Test_Serve_Client(t *testing.T) { - s := &Service{} - ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) - assert.NoError(t, err) - assert.True(t, ok) - - defer s.Stop() - - assert.NoError(t, s.Register("test", &testService{})) - - go func() { assert.NoError(t, s.Serve()) }() - - client, err := s.Client() - assert.NotNil(t, client) - assert.NoError(t, err) - defer client.Close() - - var resp string - assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) - assert.Equal(t, "hello world", resp) -} |