summaryrefslogtreecommitdiff
path: root/rpc
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-05 16:23:14 +0300
committerWolfy-J <[email protected]>2018-06-05 16:23:14 +0300
commit76ff8d1c95e087749d559ee5a4f8f0348feafffa (patch)
tree112630d2d2cfe41d809065034c13b1066b8e05c2 /rpc
parent3c86132f90ef6473b4073a8b1500d01b6114fc30 (diff)
Cs and refactoring
Diffstat (limited to 'rpc')
-rw-r--r--rpc/config.go35
-rw-r--r--rpc/service.go99
2 files changed, 134 insertions, 0 deletions
diff --git a/rpc/config.go b/rpc/config.go
new file mode 100644
index 00000000..67dc1094
--- /dev/null
+++ b/rpc/config.go
@@ -0,0 +1,35 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+)
+
+type config struct {
+ // Indicates if RPC connection is enabled.
+ Enable bool
+
+ // Listen string
+ Listen string
+}
+
+// listener creates new rpc socket listener.
+func (cfg *config) listener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+// dialer creates rpc socket dialer.
+func (cfg *config) dialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/rpc/service.go b/rpc/service.go
new file mode 100644
index 00000000..61a9a1a3
--- /dev/null
+++ b/rpc/service.go
@@ -0,0 +1,99 @@
+package rpc
+
+import (
+ "errors"
+ "github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
+ "net/rpc"
+)
+
+// Service is RPC service.
+type Service struct {
+ cfg *config
+ stop chan interface{}
+ rpc *rpc.Server
+}
+
+// WithConfig must return Service instance configured with the given environment. Must return error in case of
+// misconfiguration, might return nil as Service if Service is not enabled.
+func (s *Service) WithConfig(cfg service.Config, reg service.Registry) (service.Service, error) {
+ config := &config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return nil, err
+ }
+
+ if !config.Enable {
+ return nil, nil
+ }
+
+ return &Service{cfg: config, rpc: rpc.NewServer()}, nil
+}
+
+// Serve serves Service.
+func (s *Service) Serve() error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ s.stop = make(chan interface{})
+
+ ln, err := s.cfg.listener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ for {
+ select {
+ case <-s.stop:
+ return nil
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ s.rpc.Accept(ln)
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+
+ return nil
+}
+
+// Stop stop Service Service.
+func (s *Service) Stop() error {
+ close(s.stop)
+ return nil
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, rcvr interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, rcvr)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.cfg == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.cfg.dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}