summaryrefslogtreecommitdiff
path: root/service/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'service/rpc')
-rw-r--r--service/rpc/config.go60
-rw-r--r--service/rpc/config_test.go163
-rw-r--r--service/rpc/service.go124
-rw-r--r--service/rpc/service_test.go96
-rw-r--r--service/rpc/system.go18
5 files changed, 0 insertions, 461 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go
deleted file mode 100644
index cc492622..00000000
--- a/service/rpc/config.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package rpc
-
-import (
- "errors"
- "net"
- "strings"
-
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/util"
-)
-
-// Config defines RPC service config.
-type Config struct {
- // Indicates if RPC connection is enabled.
- Enable bool
-
- // Listen string
- Listen string
-}
-
-// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- return c.Valid()
-}
-
-// InitDefaults allows to init blank config with pre-defined set of default values.
-func (c *Config) InitDefaults() error {
- c.Enable = true
- c.Listen = "tcp://127.0.0.1:6001"
-
- return nil
-}
-
-// Valid returns nil if config is valid.
-func (c *Config) Valid() error {
- if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
- return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
- }
-
- return nil
-}
-
-// Listener creates new rpc socket Listener.
-func (c *Config) Listener() (net.Listener, error) {
- return util.CreateListener(c.Listen)
-}
-
-// Dialer creates rpc socket Dialer.
-func (c *Config) Dialer() (net.Conn, error) {
- dsn := strings.Split(c.Listen, "://")
- if len(dsn) != 2 {
- return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
- }
-
- return net.Dial(dsn[0], dsn[1])
-}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
deleted file mode 100644
index 1ecd71b3..00000000
--- a/service/rpc/config_test.go
+++ /dev/null
@@ -1,163 +0,0 @@
-package rpc
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type testCfg struct{ cfg string }
-
-func (cfg *testCfg) Get(name string) service.Config { return nil }
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- j := json.ConfigCompatibleWithStandardLibrary
- return j.Unmarshal([]byte(cfg.cfg), out)
-}
-
-func Test_Config_Hydrate(t *testing.T) {
- cfg := &testCfg{`{"enable": true, "listen": "tcp://:18001"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &testCfg{`{"enable": true, "listen": "invalid"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &testCfg{`{"enable": true, "listen": "invalid"`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Listener(t *testing.T) {
- cfg := &Config{Listen: "tcp://:18001"}
-
- ln, err := cfg.Listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- assert.Equal(t, "tcp", ln.Addr().Network())
- assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
-}
-
-func TestConfig_ListenerUnix(t *testing.T) {
- cfg := &Config{Listen: "unix://file.sock"}
-
- ln, err := cfg.Listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- assert.Equal(t, "unix", ln.Addr().Network())
- assert.Equal(t, "file.sock", ln.Addr().String())
-}
-
-func Test_Config_Error(t *testing.T) {
- cfg := &Config{Listen: "uni:unix.sock"}
- ln, err := cfg.Listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error())
-}
-
-func Test_Config_ErrorMethod(t *testing.T) {
- cfg := &Config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.Listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
-
-func TestConfig_Dialer(t *testing.T) {
- cfg := &Config{Listen: "tcp://:18001"}
-
- ln, _ := cfg.Listener()
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- conn, err := cfg.Dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer func() {
- err := conn.Close()
- if err != nil {
- t.Errorf("error closing the connection: error %v", err)
- }
- }()
-
- assert.Equal(t, "tcp", conn.RemoteAddr().Network())
- assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
-}
-
-func TestConfig_DialerUnix(t *testing.T) {
- cfg := &Config{Listen: "unix://file.sock"}
-
- ln, _ := cfg.Listener()
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- conn, err := cfg.Dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer func() {
- err := conn.Close()
- if err != nil {
- t.Errorf("error closing the connection: error %v", err)
- }
- }()
-
- assert.Equal(t, "unix", conn.RemoteAddr().Network())
- assert.Equal(t, "file.sock", conn.RemoteAddr().String())
-}
-
-func Test_Config_DialerError(t *testing.T) {
- cfg := &Config{Listen: "uni:unix.sock"}
- ln, err := cfg.Dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error())
-}
-
-func Test_Config_DialerErrorMethod(t *testing.T) {
- cfg := &Config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.Dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
-
-func Test_Config_Defaults(t *testing.T) {
- c := &Config{}
- err := c.InitDefaults()
- if err != nil {
- t.Errorf("error during the InitDefaults: error %v", err)
- }
- assert.Equal(t, true, c.Enable)
- assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen)
-}
diff --git a/service/rpc/service.go b/service/rpc/service.go
deleted file mode 100644
index 7a649f1b..00000000
--- a/service/rpc/service.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package rpc
-
-import (
- "errors"
- "github.com/spiral/goridge/v2"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- "net/rpc"
- "sync"
-)
-
-// ID contains default service name.
-const ID = "rpc"
-
-// Service is RPC service.
-type Service struct {
- cfg *Config
- stop chan interface{}
- rpc *rpc.Server
- mu sync.Mutex
- serving bool
-}
-
-// Init rpc service. Must return true if service is enabled.
-func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) {
- if !cfg.Enable {
- return false, nil
- }
-
- s.cfg = cfg
- s.rpc = rpc.NewServer()
-
- if env != nil {
- env.SetEnv("RR_RPC", cfg.Listen)
- }
-
- if err := s.Register("system", &systemService{c}); err != nil {
- return false, err
- }
-
- return true, nil
-}
-
-// Serve serves the service.
-func (s *Service) Serve() error {
- if s.rpc == nil {
- return errors.New("RPC service is not configured")
- }
-
- s.mu.Lock()
- s.serving = true
- s.stop = make(chan interface{})
- s.mu.Unlock()
-
- ln, err := s.cfg.Listener()
- if err != nil {
- return err
- }
- defer ln.Close()
-
- go func() {
- for {
- select {
- case <-s.stop:
- return
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
- }
-
- go s.rpc.ServeCodec(goridge.NewCodec(conn))
- }
- }
- }()
-
- <-s.stop
-
- s.mu.Lock()
- s.serving = false
- s.mu.Unlock()
-
- return nil
-}
-
-// Stop stops the service.
-func (s *Service) Stop() {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if s.serving {
- close(s.stop)
- }
-}
-
-// Register publishes in the server the set of methods of the
-// receiver value that satisfy the following conditions:
-// - exported method of exported type
-// - two arguments, both of exported type
-// - the second argument is a pointer
-// - one return value, of type error
-// It returns an error if the receiver is not an exported type or has
-// no suitable methods. It also logs the error using package log.
-func (s *Service) Register(name string, svc interface{}) error {
- if s.rpc == nil {
- return errors.New("RPC service is not configured")
- }
-
- return s.rpc.RegisterName(name, svc)
-}
-
-// Client creates new RPC client.
-func (s *Service) Client() (*rpc.Client, error) {
- if s.cfg == nil {
- return nil, errors.New("RPC service is not configured")
- }
-
- conn, err := s.cfg.Dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
-}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
deleted file mode 100644
index 51c1b337..00000000
--- a/service/rpc/service_test.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package rpc
-
-import (
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-type testService struct{}
-
-func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
-
-func Test_Disabled(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil)
-
- assert.NoError(t, err)
- assert.False(t, ok)
-}
-
-func Test_RegisterNotConfigured(t *testing.T) {
- s := &Service{}
- assert.Error(t, s.Register("test", &testService{}))
-
- client, err := s.Client()
- assert.Nil(t, client)
- assert.Error(t, err)
- assert.Error(t, s.Serve())
-}
-
-func Test_Enabled(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
-
- assert.NoError(t, err)
- assert.True(t, ok)
-}
-
-func Test_StopNonServing(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
-
- assert.NoError(t, err)
- assert.True(t, ok)
- s.Stop()
-}
-
-func Test_Serve_Errors(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "malformed"}, service.NewContainer(nil), nil)
- assert.NoError(t, err)
- assert.True(t, ok)
-
- assert.Error(t, s.Serve())
-
- client, err := s.Client()
- assert.Nil(t, client)
- assert.Error(t, err)
-}
-
-func Test_Serve_Client(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil)
- assert.NoError(t, err)
- assert.True(t, ok)
-
- defer s.Stop()
-
- assert.NoError(t, s.Register("test", &testService{}))
-
- go func() { assert.NoError(t, s.Serve()) }()
- time.Sleep(time.Second)
-
- client, err := s.Client()
- assert.NotNil(t, client)
- assert.NoError(t, err)
-
- var resp string
- assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
- assert.Equal(t, "hello world", resp)
- assert.NoError(t, client.Close())
-}
-
-func TestSetEnv(t *testing.T) {
- s := &Service{}
- e := env.NewService(map[string]string{})
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e)
-
- assert.NoError(t, err)
- assert.True(t, ok)
-
- v, _ := e.GetEnv()
- assert.Equal(t, "tcp://localhost:9018", v["RR_RPC"])
-}
diff --git a/service/rpc/system.go b/service/rpc/system.go
deleted file mode 100644
index ffba3782..00000000
--- a/service/rpc/system.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package rpc
-
-import "github.com/spiral/roadrunner/service"
-
-// systemService service controls rr server.
-type systemService struct {
- c service.Container
-}
-
-// Detach the underlying c.
-func (s *systemService) Stop(stop bool, r *string) error {
- if stop {
- s.c.Stop()
- }
- *r = "OK"
-
- return nil
-}