summaryrefslogtreecommitdiff
path: root/service/rpc
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-10 17:06:06 +0300
committerWolfy-J <[email protected]>2018-06-10 17:06:06 +0300
commit232aa8f3c20a060e556ab431467f4f7b3f83bfbf (patch)
treea9dacbc142020cabae6a0708733aadb7e789aea5 /service/rpc
parent3fe85e9d92f5f98337e8f7fd9a14e6b66b9694bd (diff)
http service
Diffstat (limited to 'service/rpc')
-rw-r--r--service/rpc/config.go35
-rw-r--r--service/rpc/config_test.go109
-rw-r--r--service/rpc/service.go122
-rw-r--r--service/rpc/service_test.go95
4 files changed, 0 insertions, 361 deletions
diff --git a/service/rpc/config.go b/service/rpc/config.go
deleted file mode 100644
index 8a34752a..00000000
--- a/service/rpc/config.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package rpc
-
-import (
- "errors"
- "net"
- "strings"
-)
-
-type config struct {
- // Indicates if RPC connection is enabled.
- Enable bool
-
- // Listen string
- Listen string
-}
-
-// listener creates new rpc socket listener.
-func (cfg *config) listener() (net.Listener, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
- }
-
- return net.Listen(dsn[0], dsn[1])
-}
-
-// dialer creates rpc socket dialer.
-func (cfg *config) dialer() (net.Conn, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
- }
-
- return net.Dial(dsn[0], dsn[1])
-}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
deleted file mode 100644
index a953e30e..00000000
--- a/service/rpc/config_test.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package rpc
-
-import (
- "github.com/stretchr/testify/assert"
- "runtime"
- "testing"
-)
-
-func TestConfig_Listener(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
-
- ln, err := cfg.listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer ln.Close()
-
- assert.Equal(t, "tcp", ln.Addr().Network())
- assert.Equal(t, "[::]:18001", ln.Addr().String())
-}
-
-func TestConfig_ListenerUnix(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("not supported on " + runtime.GOOS)
- }
-
- cfg := &config{Listen: "unix://rpc.sock"}
-
- ln, err := cfg.listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer ln.Close()
-
- assert.Equal(t, "unix", ln.Addr().Network())
- assert.Equal(t, "rpc.sock", ln.Addr().String())
-}
-
-func Test_Config_Error(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("not supported on " + runtime.GOOS)
- }
-
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
-}
-
-func Test_Config_ErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
-
-func TestConfig_Dialer(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
-
- ln, err := cfg.listener()
- defer ln.Close()
-
- conn, err := cfg.dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer conn.Close()
-
- assert.Equal(t, "tcp", conn.RemoteAddr().Network())
- assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
-}
-
-func TestConfig_DialerUnix(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("not supported on " + runtime.GOOS)
- }
-
- cfg := &config{Listen: "unix://rpc.sock"}
-
- ln, err := cfg.listener()
- defer ln.Close()
-
- conn, err := cfg.dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer conn.Close()
-
- assert.Equal(t, "unix", conn.RemoteAddr().Network())
- assert.Equal(t, "rpc.sock", conn.RemoteAddr().String())
-}
-
-func Test_Config_DialerError(t *testing.T) {
- if runtime.GOOS == "windows" {
- t.Skip("not supported on " + runtime.GOOS)
- }
-
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
-}
-
-func Test_Config_DialerErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
diff --git a/service/rpc/service.go b/service/rpc/service.go
deleted file mode 100644
index ce1e3351..00000000
--- a/service/rpc/service.go
+++ /dev/null
@@ -1,122 +0,0 @@
-package rpc
-
-import (
- "errors"
- "github.com/spiral/goridge"
- "github.com/spiral/roadrunner/service"
- "net/rpc"
- "sync"
-)
-
-// Name contains default service name.
-const Name = "rpc"
-
-// Service is RPC service.
-type Service struct {
- cfg *config
- stop chan interface{}
- rpc *rpc.Server
-
- mu sync.Mutex
- serving bool
-}
-
-// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
-// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) {
- config := &config{}
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
- return false, nil
- }
-
- s.cfg = config
- s.rpc = rpc.NewServer()
-
- return true, nil
-}
-
-// Serve serves the service.
-func (s *Service) Serve() error {
- if s.rpc == nil {
- return errors.New("RPC service is not configured")
- }
-
- s.mu.Lock()
- s.serving = true
- s.stop = make(chan interface{})
- s.mu.Unlock()
-
- ln, err := s.cfg.listener()
- if err != nil {
- return err
- }
- defer ln.Close()
-
- go func() {
- for {
- select {
- case <-s.stop:
- break
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
- }
-
- go s.rpc.ServeCodec(goridge.NewCodec(conn))
- }
- }
- }()
-
- <-s.stop
-
- s.mu.Lock()
- s.serving = false
- s.mu.Unlock()
-
- return nil
-}
-
-// Stop stops the service.
-func (s *Service) Stop() {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if s.serving {
- close(s.stop)
- }
-}
-
-// Register publishes in the server the set of methods of the
-// receiver value that satisfy the following conditions:
-// - exported method of exported type
-// - two arguments, both of exported type
-// - the second argument is a pointer
-// - one return value, of type error
-// It returns an error if the receiver is not an exported type or has
-// no suitable methods. It also logs the error using package log.
-func (s *Service) Register(name string, rcvr interface{}) error {
- if s.rpc == nil {
- return errors.New("RPC service is not configured")
- }
-
- return s.rpc.RegisterName(name, rcvr)
-}
-
-// Client creates new RPC client.
-func (s *Service) Client() (*rpc.Client, error) {
- if s.cfg == nil {
- return nil, errors.New("RPC service is not configured")
- }
-
- conn, err := s.cfg.dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
-}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
deleted file mode 100644
index a57ce1bd..00000000
--- a/service/rpc/service_test.go
+++ /dev/null
@@ -1,95 +0,0 @@
-package rpc
-
-import (
- "encoding/json"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type testService struct{}
-
-func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
-
-type testCfg struct{ cfg string }
-
-func (cfg *testCfg) Get(name string) service.Config { return nil }
-func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_ConfigError(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":false`}, nil)
-
- assert.Error(t, err)
- assert.False(t, ok)
-}
-
-func Test_Disabled(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil)
-
- assert.NoError(t, err)
- assert.False(t, ok)
-}
-
-func Test_RegisterNotConfigured(t *testing.T) {
- s := &Service{}
- assert.Error(t, s.Register("test", &testService{}))
-
- client, err := s.Client()
- assert.Nil(t, client)
- assert.Error(t, err)
- assert.Error(t, s.Serve())
-}
-
-func Test_Enabled(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
-
- assert.NoError(t, err)
- assert.True(t, ok)
-}
-
-func Test_StopNonServing(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
-
- assert.NoError(t, err)
- assert.True(t, ok)
- s.Stop()
-}
-
-func Test_Serve_Errors(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
- assert.NoError(t, err)
- assert.True(t, ok)
-
- assert.Error(t, s.Serve())
-
- client, err := s.Client()
- assert.Nil(t, client)
- assert.Error(t, err)
-}
-
-func Test_Serve_Client(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
- assert.NoError(t, err)
- assert.True(t, ok)
-
- defer s.Stop()
-
- assert.NoError(t, s.Register("test", &testService{}))
-
- go func() { assert.NoError(t, s.Serve()) }()
-
- client, err := s.Client()
- assert.NotNil(t, client)
- assert.NoError(t, err)
- defer client.Close()
-
- var resp string
- assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
- assert.Equal(t, "hello world", resp)
-}