summaryrefslogtreecommitdiff
path: root/plugins/rpc/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-04 16:38:54 +0300
committerValery Piashchynski <[email protected]>2020-11-04 16:38:54 +0300
commita3dd9f2a28d91df0e0d1ad229b67297206357c07 (patch)
treec53893144e9f2228b490ee0f0a66ff875706d36b /plugins/rpc/rpc.go
parent0304f09d7e5fa0c76b86429a654aeb366cec6391 (diff)
Fix CI issues, add RPC tests to the CI
Diffstat (limited to 'plugins/rpc/rpc.go')
-rwxr-xr-xplugins/rpc/rpc.go43
1 files changed, 26 insertions, 17 deletions
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index cda42aa0..5203fc65 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,7 +1,9 @@
package rpc
import (
+ "net"
"net/rpc"
+ "sync/atomic"
"go.uber.org/zap"
@@ -28,7 +30,8 @@ type Plugin struct {
log *zap.Logger
rpc *rpc.Server
services []Pluggable
- close chan struct{}
+ listener net.Listener
+ closed *uint32
}
// Init rpc service. Must return true if service is enabled.
@@ -49,7 +52,9 @@ func (s *Plugin) Init(cfg config.Configurer, log *zap.Logger) error {
}
s.log = log
- s.close = make(chan struct{}, 1)
+ state := uint32(0)
+ s.closed = &state
+ atomic.StoreUint32(s.closed, 0)
return s.cfg.Valid()
}
@@ -80,7 +85,8 @@ func (s *Plugin) Serve() chan error {
services = append(services, s.services[i].Name())
}
- ln, err := s.cfg.Listener()
+ var err error
+ s.listener, err = s.cfg.Listener()
if err != nil {
errCh <- err
return errCh
@@ -90,22 +96,20 @@ func (s *Plugin) Serve() chan error {
go func() {
for {
- select {
- case <-s.close:
- // log error
- err = ln.Close()
- if err != nil {
- errCh <- errors.E(errors.Op("close RPC socket"), err)
- }
- return
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
+ conn, err := s.listener.Accept()
+ if err != nil {
+ if atomic.LoadUint32(s.closed) == 1 {
+ // just log and continue, this is not a critical issue, we just called Stop
+ s.log.Error("listener accept error, connection closed", zap.Error(err))
+ return
}
- go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ s.log.Error("listener accept error", zap.Error(err))
+ errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
+ return
}
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
}
}()
@@ -114,7 +118,12 @@ func (s *Plugin) Serve() chan error {
// Stop stops the service.
func (s *Plugin) Stop() error {
- s.close <- struct{}{}
+ // store closed state
+ atomic.StoreUint32(s.closed, 1)
+ err := s.listener.Close()
+ if err != nil {
+ return errors.E(errors.Op("stop RPC socket"), err)
+ }
return nil
}