From a3dd9f2a28d91df0e0d1ad229b67297206357c07 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 4 Nov 2020 16:38:54 +0300 Subject: Fix CI issues, add RPC tests to the CI --- plugins/rpc/rpc.go | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) (limited to 'plugins/rpc/rpc.go') diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index cda42aa0..5203fc65 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,7 +1,9 @@ package rpc import ( + "net" "net/rpc" + "sync/atomic" "go.uber.org/zap" @@ -28,7 +30,8 @@ type Plugin struct { log *zap.Logger rpc *rpc.Server services []Pluggable - close chan struct{} + listener net.Listener + closed *uint32 } // Init rpc service. Must return true if service is enabled. @@ -49,7 +52,9 @@ func (s *Plugin) Init(cfg config.Configurer, log *zap.Logger) error { } s.log = log - s.close = make(chan struct{}, 1) + state := uint32(0) + s.closed = &state + atomic.StoreUint32(s.closed, 0) return s.cfg.Valid() } @@ -80,7 +85,8 @@ func (s *Plugin) Serve() chan error { services = append(services, s.services[i].Name()) } - ln, err := s.cfg.Listener() + var err error + s.listener, err = s.cfg.Listener() if err != nil { errCh <- err return errCh @@ -90,22 +96,20 @@ func (s *Plugin) Serve() chan error { go func() { for { - select { - case <-s.close: - // log error - err = ln.Close() - if err != nil { - errCh <- errors.E(errors.Op("close RPC socket"), err) - } - return - default: - conn, err := ln.Accept() - if err != nil { - continue + conn, err := s.listener.Accept() + if err != nil { + if atomic.LoadUint32(s.closed) == 1 { + // just log and continue, this is not a critical issue, we just called Stop + s.log.Error("listener accept error, connection closed", zap.Error(err)) + return } - go s.rpc.ServeCodec(goridge.NewCodec(conn)) + s.log.Error("listener accept error", zap.Error(err)) + errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) + return } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) } }() @@ -114,7 +118,12 @@ func (s *Plugin) Serve() chan error { // Stop stops the service. func (s *Plugin) Stop() error { - s.close <- struct{}{} + // store closed state + atomic.StoreUint32(s.closed, 1) + err := s.listener.Close() + if err != nil { + return errors.E(errors.Op("stop RPC socket"), err) + } return nil } -- cgit v1.2.3