From bca69d968ef76a6260956bc169cb07996fbb7724 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 5 Nov 2020 15:45:18 +0300 Subject: App test --- plugins/rpc/plugin.go | 171 ++++++++++++++++++++++++++++++++++++++++++ plugins/rpc/rpc.go | 172 ------------------------------------------- plugins/rpc/tests/plugin2.go | 3 - 3 files changed, 171 insertions(+), 175 deletions(-) create mode 100755 plugins/rpc/plugin.go delete mode 100755 plugins/rpc/rpc.go (limited to 'plugins/rpc') diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go new file mode 100755 index 00000000..6401c0e2 --- /dev/null +++ b/plugins/rpc/plugin.go @@ -0,0 +1,171 @@ +package rpc + +import ( + "net" + "net/rpc" + "sync/atomic" + + "github.com/spiral/endure" + "github.com/spiral/errors" + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +// Pluggable declares the ability to create set of public RPC methods. +type Pluggable interface { + endure.Named + + // Provides RPC methods for the given service. + RPCService() (interface{}, error) +} + +// ServiceName contains default service name. +const ServiceName = "RPC" + +// Plugin is RPC service. +type Plugin struct { + cfg Config + log log.Logger + rpc *rpc.Server + services []Pluggable + listener net.Listener + closed *uint32 +} + +// Init rpc service. Must return true if service is enabled. +func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("RPC Init") + if !cfg.Has(ServiceName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return err + } + s.cfg.InitDefaults() + + if s.cfg.Disabled { + return errors.E(op, errors.Disabled) + } + + s.log = log + state := uint32(0) + s.closed = &state + atomic.StoreUint32(s.closed, 0) + + return s.cfg.Valid() +} + +// Serve serves the service. +func (s *Plugin) Serve() chan error { + const op = errors.Op("register service") + errCh := make(chan error, 1) + + s.rpc = rpc.NewServer() + + services := make([]string, 0, len(s.services)) + + // Attach all services + for i := 0; i < len(s.services); i++ { + svc, err := s.services[i].RPCService() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + err = s.Register(s.services[i].Name(), svc) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + services = append(services, s.services[i].Name()) + } + + var err error + s.listener, err = s.cfg.Listener() + if err != nil { + errCh <- err + return errCh + } + + s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services) + + go func() { + for { + conn, err := s.listener.Accept() + if err != nil { + if atomic.LoadUint32(s.closed) == 1 { + // just log and continue, this is not a critical issue, we just called Stop + s.log.Error("listener accept error, connection closed", "error", err) + return + } + + s.log.Error("listener accept error", "error", err) + errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) + return + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + }() + + return errCh +} + +// Stop stops the service. +func (s *Plugin) Stop() error { + // store closed state + atomic.StoreUint32(s.closed, 1) + err := s.listener.Close() + if err != nil { + return errors.E(errors.Op("stop RPC socket"), err) + } + return nil +} + +// Name contains service name. +func (s *Plugin) Name() string { + return ServiceName +} + +// Depends declares services to collect for RPC. +func (s *Plugin) Collects() []interface{} { + return []interface{}{ + s.RegisterPlugin, + } +} + +// RegisterPlugin registers RPC service plugin. +func (s *Plugin) RegisterPlugin(p Pluggable) error { + s.services = append(s.services, p) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Plugin) Register(name string, svc interface{}) error { + if s.rpc == nil { + return errors.E("RPC service is not configured") + } + + return s.rpc.RegisterName(name, svc) +} + +// Client creates new RPC client. +func (s *Plugin) Client() (*rpc.Client, error) { + conn, err := s.cfg.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go deleted file mode 100755 index 5203fc65..00000000 --- a/plugins/rpc/rpc.go +++ /dev/null @@ -1,172 +0,0 @@ -package rpc - -import ( - "net" - "net/rpc" - "sync/atomic" - - "go.uber.org/zap" - - "github.com/spiral/endure" - "github.com/spiral/errors" - "github.com/spiral/goridge/v2" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -// Pluggable declares the ability to create set of public RPC methods. -type Pluggable interface { - endure.Named - - // Provides RPC methods for the given service. - RPCService() (interface{}, error) -} - -// ServiceName contains default service name. -const ServiceName = "rpc" - -// Plugin is RPC service. -type Plugin struct { - cfg Config - log *zap.Logger - rpc *rpc.Server - services []Pluggable - listener net.Listener - closed *uint32 -} - -// Init rpc service. Must return true if service is enabled. -func (s *Plugin) Init(cfg config.Configurer, log *zap.Logger) error { - const op = errors.Op("RPC Init") - if !cfg.Has(ServiceName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(ServiceName, &s.cfg) - if err != nil { - return err - } - s.cfg.InitDefaults() - - if s.cfg.Disabled { - return errors.E(op, errors.Disabled) - } - - s.log = log - state := uint32(0) - s.closed = &state - atomic.StoreUint32(s.closed, 0) - - return s.cfg.Valid() -} - -// Serve serves the service. -func (s *Plugin) Serve() chan error { - const op = errors.Op("register service") - errCh := make(chan error, 1) - - s.rpc = rpc.NewServer() - - services := make([]string, 0, len(s.services)) - - // Attach all services - for i := 0; i < len(s.services); i++ { - svc, err := s.services[i].RPCService() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - err = s.Register(s.services[i].Name(), svc) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - services = append(services, s.services[i].Name()) - } - - var err error - s.listener, err = s.cfg.Listener() - if err != nil { - errCh <- err - return errCh - } - - s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", services)) - - go func() { - for { - conn, err := s.listener.Accept() - if err != nil { - if atomic.LoadUint32(s.closed) == 1 { - // just log and continue, this is not a critical issue, we just called Stop - s.log.Error("listener accept error, connection closed", zap.Error(err)) - return - } - - s.log.Error("listener accept error", zap.Error(err)) - errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) - return - } - - go s.rpc.ServeCodec(goridge.NewCodec(conn)) - } - }() - - return errCh -} - -// Stop stops the service. -func (s *Plugin) Stop() error { - // store closed state - atomic.StoreUint32(s.closed, 1) - err := s.listener.Close() - if err != nil { - return errors.E(errors.Op("stop RPC socket"), err) - } - return nil -} - -// Name contains service name. -func (s *Plugin) Name() string { - return ServiceName -} - -// Depends declares services to collect for RPC. -func (s *Plugin) Collects() []interface{} { - return []interface{}{ - s.RegisterPlugin, - } -} - -// RegisterPlugin registers RPC service plugin. -func (s *Plugin) RegisterPlugin(p Pluggable) error { - s.services = append(s.services, p) - return nil -} - -// Register publishes in the server the set of methods of the -// receiver value that satisfy the following conditions: -// - exported method of exported type -// - two arguments, both of exported type -// - the second argument is a pointer -// - one return value, of type error -// It returns an error if the receiver is not an exported type or has -// no suitable methods. It also logs the error using package log. -func (s *Plugin) Register(name string, svc interface{}) error { - if s.rpc == nil { - return errors.E("RPC service is not configured") - } - - return s.rpc.RegisterName(name, svc) -} - -// Client creates new RPC client. -func (s *Plugin) Client() (*rpc.Client, error) { - conn, err := s.cfg.Dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 2a0ae1a8..854bf097 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -1,7 +1,6 @@ package tests import ( - "fmt" "net" "net/rpc" "time" @@ -38,8 +37,6 @@ func (p2 *Plugin2) Serve() chan error { errCh <- err return } - fmt.Println("--------------") - fmt.Println(ret) if ret != "Hello, username: Valery" { errCh <- errors.E("wrong response") return -- cgit v1.2.3