diff options
Diffstat (limited to 'service/rpc')
-rw-r--r-- | service/rpc/config.go | 60 | ||||
-rw-r--r-- | service/rpc/config_test.go | 163 | ||||
-rw-r--r-- | service/rpc/service.go | 124 | ||||
-rw-r--r-- | service/rpc/service_test.go | 96 | ||||
-rw-r--r-- | service/rpc/system.go | 18 |
5 files changed, 0 insertions, 461 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go deleted file mode 100644 index cc492622..00000000 --- a/service/rpc/config.go +++ /dev/null @@ -1,60 +0,0 @@ -package rpc - -import ( - "errors" - "net" - "strings" - - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/util" -) - -// Config defines RPC service config. -type Config struct { - // Indicates if RPC connection is enabled. - Enable bool - - // Listen string - Listen string -} - -// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - return c.Valid() -} - -// InitDefaults allows to init blank config with pre-defined set of default values. -func (c *Config) InitDefaults() error { - c.Enable = true - c.Listen = "tcp://127.0.0.1:6001" - - return nil -} - -// Valid returns nil if config is valid. -func (c *Config) Valid() error { - if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { - return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") - } - - return nil -} - -// Listener creates new rpc socket Listener. -func (c *Config) Listener() (net.Listener, error) { - return util.CreateListener(c.Listen) -} - -// Dialer creates rpc socket Dialer. -func (c *Config) Dialer() (net.Conn, error) { - dsn := strings.Split(c.Listen, "://") - if len(dsn) != 2 { - return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") - } - - return net.Dial(dsn[0], dsn[1]) -} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go deleted file mode 100644 index 1ecd71b3..00000000 --- a/service/rpc/config_test.go +++ /dev/null @@ -1,163 +0,0 @@ -package rpc - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type testCfg struct{ cfg string } - -func (cfg *testCfg) Get(name string) service.Config { return nil } -func (cfg *testCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.cfg), out) -} - -func Test_Config_Hydrate(t *testing.T) { - cfg := &testCfg{`{"enable": true, "listen": "tcp://:18001"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &testCfg{`{"enable": true, "listen": "invalid"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &testCfg{`{"enable": true, "listen": "invalid"`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Listener(t *testing.T) { - cfg := &Config{Listen: "tcp://:18001"} - - ln, err := cfg.Listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - assert.Equal(t, "tcp", ln.Addr().Network()) - assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) -} - -func TestConfig_ListenerUnix(t *testing.T) { - cfg := &Config{Listen: "unix://file.sock"} - - ln, err := cfg.Listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - assert.Equal(t, "unix", ln.Addr().Network()) - assert.Equal(t, "file.sock", ln.Addr().String()) -} - -func Test_Config_Error(t *testing.T) { - cfg := &Config{Listen: "uni:unix.sock"} - ln, err := cfg.Listener() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) -} - -func Test_Config_ErrorMethod(t *testing.T) { - cfg := &Config{Listen: "xinu://unix.sock"} - - ln, err := cfg.Listener() - assert.Nil(t, ln) - assert.Error(t, err) -} - -func TestConfig_Dialer(t *testing.T) { - cfg := &Config{Listen: "tcp://:18001"} - - ln, _ := cfg.Listener() - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - conn, err := cfg.Dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer func() { - err := conn.Close() - if err != nil { - t.Errorf("error closing the connection: error %v", err) - } - }() - - assert.Equal(t, "tcp", conn.RemoteAddr().Network()) - assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) -} - -func TestConfig_DialerUnix(t *testing.T) { - cfg := &Config{Listen: "unix://file.sock"} - - ln, _ := cfg.Listener() - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - conn, err := cfg.Dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer func() { - err := conn.Close() - if err != nil { - t.Errorf("error closing the connection: error %v", err) - } - }() - - assert.Equal(t, "unix", conn.RemoteAddr().Network()) - assert.Equal(t, "file.sock", conn.RemoteAddr().String()) -} - -func Test_Config_DialerError(t *testing.T) { - cfg := &Config{Listen: "uni:unix.sock"} - ln, err := cfg.Dialer() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error()) -} - -func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &Config{Listen: "xinu://unix.sock"} - - ln, err := cfg.Dialer() - assert.Nil(t, ln) - assert.Error(t, err) -} - -func Test_Config_Defaults(t *testing.T) { - c := &Config{} - err := c.InitDefaults() - if err != nil { - t.Errorf("error during the InitDefaults: error %v", err) - } - assert.Equal(t, true, c.Enable) - assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen) -} diff --git a/service/rpc/service.go b/service/rpc/service.go deleted file mode 100644 index 7a649f1b..00000000 --- a/service/rpc/service.go +++ /dev/null @@ -1,124 +0,0 @@ -package rpc - -import ( - "errors" - "github.com/spiral/goridge/v2" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/env" - "net/rpc" - "sync" -) - -// ID contains default service name. -const ID = "rpc" - -// Service is RPC service. -type Service struct { - cfg *Config - stop chan interface{} - rpc *rpc.Server - mu sync.Mutex - serving bool -} - -// Init rpc service. Must return true if service is enabled. -func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) { - if !cfg.Enable { - return false, nil - } - - s.cfg = cfg - s.rpc = rpc.NewServer() - - if env != nil { - env.SetEnv("RR_RPC", cfg.Listen) - } - - if err := s.Register("system", &systemService{c}); err != nil { - return false, err - } - - return true, nil -} - -// Serve serves the service. -func (s *Service) Serve() error { - if s.rpc == nil { - return errors.New("RPC service is not configured") - } - - s.mu.Lock() - s.serving = true - s.stop = make(chan interface{}) - s.mu.Unlock() - - ln, err := s.cfg.Listener() - if err != nil { - return err - } - defer ln.Close() - - go func() { - for { - select { - case <-s.stop: - return - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go s.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - }() - - <-s.stop - - s.mu.Lock() - s.serving = false - s.mu.Unlock() - - return nil -} - -// Stop stops the service. -func (s *Service) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.serving { - close(s.stop) - } -} - -// Register publishes in the server the set of methods of the -// receiver value that satisfy the following conditions: -// - exported method of exported type -// - two arguments, both of exported type -// - the second argument is a pointer -// - one return value, of type error -// It returns an error if the receiver is not an exported type or has -// no suitable methods. It also logs the error using package log. -func (s *Service) Register(name string, svc interface{}) error { - if s.rpc == nil { - return errors.New("RPC service is not configured") - } - - return s.rpc.RegisterName(name, svc) -} - -// Client creates new RPC client. -func (s *Service) Client() (*rpc.Client, error) { - if s.cfg == nil { - return nil, errors.New("RPC service is not configured") - } - - conn, err := s.cfg.Dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go deleted file mode 100644 index 51c1b337..00000000 --- a/service/rpc/service_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package rpc - -import ( - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/env" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -type testService struct{} - -func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } - -func Test_Disabled(t *testing.T) { - s := &Service{} - ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil) - - assert.NoError(t, err) - assert.False(t, ok) -} - -func Test_RegisterNotConfigured(t *testing.T) { - s := &Service{} - assert.Error(t, s.Register("test", &testService{})) - - client, err := s.Client() - assert.Nil(t, client) - assert.Error(t, err) - assert.Error(t, s.Serve()) -} - -func Test_Enabled(t *testing.T) { - s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) - - assert.NoError(t, err) - assert.True(t, ok) -} - -func Test_StopNonServing(t *testing.T) { - s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) - - assert.NoError(t, err) - assert.True(t, ok) - s.Stop() -} - -func Test_Serve_Errors(t *testing.T) { - s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "malformed"}, service.NewContainer(nil), nil) - assert.NoError(t, err) - assert.True(t, ok) - - assert.Error(t, s.Serve()) - - client, err := s.Client() - assert.Nil(t, client) - assert.Error(t, err) -} - -func Test_Serve_Client(t *testing.T) { - s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil) - assert.NoError(t, err) - assert.True(t, ok) - - defer s.Stop() - - assert.NoError(t, s.Register("test", &testService{})) - - go func() { assert.NoError(t, s.Serve()) }() - time.Sleep(time.Second) - - client, err := s.Client() - assert.NotNil(t, client) - assert.NoError(t, err) - - var resp string - assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) - assert.Equal(t, "hello world", resp) - assert.NoError(t, client.Close()) -} - -func TestSetEnv(t *testing.T) { - s := &Service{} - e := env.NewService(map[string]string{}) - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e) - - assert.NoError(t, err) - assert.True(t, ok) - - v, _ := e.GetEnv() - assert.Equal(t, "tcp://localhost:9018", v["RR_RPC"]) -} diff --git a/service/rpc/system.go b/service/rpc/system.go deleted file mode 100644 index ffba3782..00000000 --- a/service/rpc/system.go +++ /dev/null @@ -1,18 +0,0 @@ -package rpc - -import "github.com/spiral/roadrunner/service" - -// systemService service controls rr server. -type systemService struct { - c service.Container -} - -// Detach the underlying c. -func (s *systemService) Stop(stop bool, r *string) error { - if stop { - s.c.Stop() - } - *r = "OK" - - return nil -} |