diff options
author | Wolfy-J <[email protected]> | 2018-06-05 16:23:14 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-05 16:23:14 +0300 |
commit | 76ff8d1c95e087749d559ee5a4f8f0348feafffa (patch) | |
tree | 112630d2d2cfe41d809065034c13b1066b8e05c2 /rpc | |
parent | 3c86132f90ef6473b4073a8b1500d01b6114fc30 (diff) |
Cs and refactoring
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/config.go | 35 | ||||
-rw-r--r-- | rpc/service.go | 99 |
2 files changed, 134 insertions, 0 deletions
diff --git a/rpc/config.go b/rpc/config.go new file mode 100644 index 00000000..67dc1094 --- /dev/null +++ b/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // Listen string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/rpc/service.go b/rpc/service.go new file mode 100644 index 00000000..61a9a1a3 --- /dev/null +++ b/rpc/service.go @@ -0,0 +1,99 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" +) + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server +} + +// WithConfig must return Service instance configured with the given environment. Must return error in case of +// misconfiguration, might return nil as Service if Service is not enabled. +func (s *Service) WithConfig(cfg service.Config, reg service.Registry) (service.Service, error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return nil, err + } + + if !config.Enable { + return nil, nil + } + + return &Service{cfg: config, rpc: rpc.NewServer()}, nil +} + +// Serve serves Service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.stop = make(chan interface{}) + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + for { + select { + case <-s.stop: + return nil + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + s.rpc.Accept(ln) + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + + return nil +} + +// Stop stop Service Service. +func (s *Service) Stop() error { + close(s.stop) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} |