diff options
author | Wolfy-J <[email protected]> | 2018-06-11 11:28:24 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-11 11:28:24 +0300 |
commit | 2f135b359575cc1625d1461bb6d8e478da8ccf54 (patch) | |
tree | 8561f84318c3a291f4a38df6b347954bda777e41 /service/rpc/service.go | |
parent | 6efaa0aa951240c2bb643761f103ee3f0fafb4d9 (diff) |
refactor
Diffstat (limited to 'service/rpc/service.go')
-rw-r--r-- | service/rpc/service.go | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/service/rpc/service.go b/service/rpc/service.go new file mode 100644 index 00000000..ce1e3351 --- /dev/null +++ b/service/rpc/service.go @@ -0,0 +1,122 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" + "sync" +) + +// Name contains default service name. +const Name = "rpc" + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server + + mu sync.Mutex + serving bool +} + +// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + s.cfg = config + s.rpc = rpc.NewServer() + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.mu.Lock() + s.serving = true + s.stop = make(chan interface{}) + s.mu.Unlock() + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + go func() { + for { + select { + case <-s.stop: + break + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + <-s.stop + + s.mu.Lock() + s.serving = false + s.mu.Unlock() + + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.serving { + close(s.stop) + } +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} |