summaryrefslogtreecommitdiff
path: root/plugins/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/rpc')
-rwxr-xr-xplugins/rpc/config.go49
-rwxr-xr-xplugins/rpc/config_test.go143
-rwxr-xr-xplugins/rpc/doc/plugin_arch.drawio1
-rwxr-xr-xplugins/rpc/plugin.go165
-rw-r--r--plugins/rpc/tests/.rr-rpc-disabled.yaml6
-rw-r--r--plugins/rpc/tests/.rr.yaml6
-rw-r--r--plugins/rpc/tests/plugin1.go42
-rw-r--r--plugins/rpc/tests/plugin2.go54
-rw-r--r--plugins/rpc/tests/rpc_test.go169
9 files changed, 0 insertions, 635 deletions
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
deleted file mode 100755
index 719fd5e3..00000000
--- a/plugins/rpc/config.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package rpc
-
-import (
- "errors"
- "net"
- "strings"
-
- "github.com/spiral/roadrunner/v2/util"
-)
-
-// Config defines RPC service config.
-type Config struct {
- // Listen string
- Listen string
-
- // Disabled disables RPC service.
- Disabled bool
-}
-
-// InitDefaults allows to init blank config with pre-defined set of default values.
-func (c *Config) InitDefaults() {
- if c.Listen == "" {
- c.Listen = "tcp://127.0.0.1:6001"
- }
-}
-
-// Valid returns nil if config is valid.
-func (c *Config) Valid() error {
- if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
- return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
- }
-
- return nil
-}
-
-// Listener creates new rpc socket Listener.
-func (c *Config) Listener() (net.Listener, error) {
- return util.CreateListener(c.Listen)
-}
-
-// Dialer creates rpc socket Dialer.
-func (c *Config) Dialer() (net.Conn, error) {
- dsn := strings.Split(c.Listen, "://")
- if len(dsn) != 2 {
- return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
- }
-
- return net.Dial(dsn[0], dsn[1])
-}
diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go
deleted file mode 100755
index 67532bc8..00000000
--- a/plugins/rpc/config_test.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package rpc
-
-import (
- "runtime"
- "testing"
-
- j "github.com/json-iterator/go"
- "github.com/stretchr/testify/assert"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-type testCfg struct{ cfg string }
-
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- return json.Unmarshal([]byte(cfg.cfg), out)
-}
-
-func TestConfig_Listener(t *testing.T) {
- cfg := &Config{Listen: "tcp://:18001"}
-
- ln, err := cfg.Listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- assert.Equal(t, "tcp", ln.Addr().Network())
- if runtime.GOOS == "windows" {
- assert.Equal(t, "[::]:18001", ln.Addr().String())
- } else {
- assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
- }
-}
-
-func TestConfig_ListenerUnix(t *testing.T) {
- cfg := &Config{Listen: "unix://file.sock"}
-
- ln, err := cfg.Listener()
- assert.NoError(t, err)
- assert.NotNil(t, ln)
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- assert.Equal(t, "unix", ln.Addr().Network())
- assert.Equal(t, "file.sock", ln.Addr().String())
-}
-
-func Test_Config_Error(t *testing.T) {
- cfg := &Config{Listen: "uni:unix.sock"}
- ln, err := cfg.Listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error())
-}
-
-func Test_Config_ErrorMethod(t *testing.T) {
- cfg := &Config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.Listener()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
-
-func TestConfig_Dialer(t *testing.T) {
- cfg := &Config{Listen: "tcp://:18001"}
-
- ln, _ := cfg.Listener()
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- conn, err := cfg.Dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer func() {
- err := conn.Close()
- if err != nil {
- t.Errorf("error closing the connection: error %v", err)
- }
- }()
-
- assert.Equal(t, "tcp", conn.RemoteAddr().Network())
- assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
-}
-
-func TestConfig_DialerUnix(t *testing.T) {
- cfg := &Config{Listen: "unix://file.sock"}
-
- ln, _ := cfg.Listener()
- defer func() {
- err := ln.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- conn, err := cfg.Dialer()
- assert.NoError(t, err)
- assert.NotNil(t, conn)
- defer func() {
- err := conn.Close()
- if err != nil {
- t.Errorf("error closing the connection: error %v", err)
- }
- }()
-
- assert.Equal(t, "unix", conn.RemoteAddr().Network())
- assert.Equal(t, "file.sock", conn.RemoteAddr().String())
-}
-
-func Test_Config_DialerError(t *testing.T) {
- cfg := &Config{Listen: "uni:unix.sock"}
- ln, err := cfg.Dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
- assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error())
-}
-
-func Test_Config_DialerErrorMethod(t *testing.T) {
- cfg := &Config{Listen: "xinu://unix.sock"}
-
- ln, err := cfg.Dialer()
- assert.Nil(t, ln)
- assert.Error(t, err)
-}
-
-func Test_Config_Defaults(t *testing.T) {
- c := &Config{}
- c.InitDefaults()
- assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen)
-}
diff --git a/plugins/rpc/doc/plugin_arch.drawio b/plugins/rpc/doc/plugin_arch.drawio
deleted file mode 100755
index dec5f0b2..00000000
--- a/plugins/rpc/doc/plugin_arch.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2020-10-19T17:14:19.125Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="2J39x4EyFr1zaE9BXKM4" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
deleted file mode 100755
index d0dc0ff1..00000000
--- a/plugins/rpc/plugin.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package rpc
-
-import (
- "net"
- "net/rpc"
- "sync/atomic"
-
- "github.com/spiral/endure"
- "github.com/spiral/errors"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/interfaces/config"
- "github.com/spiral/roadrunner/v2/interfaces/log"
- rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
-)
-
-// PluginName contains default plugin name.
-const PluginName = "RPC"
-
-type pluggable struct {
- service rpc_.RPCer
- name string
-}
-
-// Plugin is RPC service.
-type Plugin struct {
- cfg Config
- log log.Logger
- rpc *rpc.Server
- services []pluggable
- listener net.Listener
- closed *uint32
-}
-
-// Init rpc service. Must return true if service is enabled.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
- const op = errors.Op("RPC Init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return err
- }
- s.cfg.InitDefaults()
-
- if s.cfg.Disabled {
- return errors.E(op, errors.Disabled)
- }
-
- s.log = log
- state := uint32(0)
- s.closed = &state
- atomic.StoreUint32(s.closed, 0)
-
- return s.cfg.Valid()
-}
-
-// Serve serves the service.
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("register service")
- errCh := make(chan error, 1)
-
- s.rpc = rpc.NewServer()
-
- services := make([]string, 0, len(s.services))
-
- // Attach all services
- for i := 0; i < len(s.services); i++ {
- err := s.Register(s.services[i].name, s.services[i].service.RPC())
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- services = append(services, s.services[i].name)
- }
-
- var err error
- s.listener, err = s.cfg.Listener()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services)
-
- go func() {
- for {
- conn, err := s.listener.Accept()
- if err != nil {
- if atomic.LoadUint32(s.closed) == 1 {
- // just log and continue, this is not a critical issue, we just called Stop
- s.log.Error("listener accept error, connection closed", "error", err)
- return
- }
-
- s.log.Error("listener accept error", "error", err)
- errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
- return
- }
-
- go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn))
- }
- }()
-
- return errCh
-}
-
-// Stop stops the service.
-func (s *Plugin) Stop() error {
- // store closed state
- atomic.StoreUint32(s.closed, 1)
- err := s.listener.Close()
- if err != nil {
- return errors.E(errors.Op("stop RPC socket"), err)
- }
- return nil
-}
-
-// Name contains service name.
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Depends declares services to collect for RPC.
-func (s *Plugin) Collects() []interface{} {
- return []interface{}{
- s.RegisterPlugin,
- }
-}
-
-// RegisterPlugin registers RPC service plugin.
-func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) {
- s.services = append(s.services, pluggable{
- service: p,
- name: name.Name(),
- })
-}
-
-// Register publishes in the server the set of methods of the
-// receiver value that satisfy the following conditions:
-// - exported method of exported type
-// - two arguments, both of exported type
-// - the second argument is a pointer
-// - one return value, of type error
-// It returns an error if the receiver is not an exported type or has
-// no suitable methods. It also logs the error using package log.
-func (s *Plugin) Register(name string, svc interface{}) error {
- if s.rpc == nil {
- return errors.E("RPC service is not configured")
- }
-
- return s.rpc.RegisterName(name, svc)
-}
-
-// Client creates new RPC client.
-func (s *Plugin) Client() (*rpc.Client, error) {
- conn, err := s.cfg.Dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
-}
diff --git a/plugins/rpc/tests/.rr-rpc-disabled.yaml b/plugins/rpc/tests/.rr-rpc-disabled.yaml
deleted file mode 100644
index d5c185e7..00000000
--- a/plugins/rpc/tests/.rr-rpc-disabled.yaml
+++ /dev/null
@@ -1,6 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
- disabled: true
-logs:
- mode: development
- level: error \ No newline at end of file
diff --git a/plugins/rpc/tests/.rr.yaml b/plugins/rpc/tests/.rr.yaml
deleted file mode 100644
index d2cb6c70..00000000
--- a/plugins/rpc/tests/.rr.yaml
+++ /dev/null
@@ -1,6 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
- disabled: false
-logs:
- mode: development
- level: error \ No newline at end of file
diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go
deleted file mode 100644
index 797f821a..00000000
--- a/plugins/rpc/tests/plugin1.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package tests
-
-import (
- "fmt"
-
- "github.com/spiral/roadrunner/v2/interfaces/config"
-)
-
-type Plugin1 struct {
- config config.Configurer
-}
-
-func (p1 *Plugin1) Init(cfg config.Configurer) error {
- p1.config = cfg
- return nil
-}
-
-func (p1 *Plugin1) Serve() chan error {
- errCh := make(chan error, 1)
- return errCh
-}
-
-func (p1 *Plugin1) Stop() error {
- return nil
-}
-
-func (p1 *Plugin1) Name() string {
- return "rpc_test.plugin1"
-}
-
-func (p1 *Plugin1) RPC() interface{} {
- return &PluginRPC{srv: p1}
-}
-
-type PluginRPC struct {
- srv *Plugin1
-}
-
-func (r *PluginRPC) Hello(in string, out *string) error {
- *out = fmt.Sprintf("Hello, username: %s", in)
- return nil
-}
diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go
deleted file mode 100644
index 411b9c54..00000000
--- a/plugins/rpc/tests/plugin2.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package tests
-
-import (
- "net"
- "net/rpc"
- "time"
-
- "github.com/spiral/errors"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
-)
-
-// plugin2 makes a call to the plugin1 via RPC
-// this is just a simulation of external call FOR TEST
-// you don't need to do such things :)
-type Plugin2 struct {
-}
-
-func (p2 *Plugin2) Init() error {
- return nil
-}
-
-func (p2 *Plugin2) Serve() chan error {
- errCh := make(chan error, 1)
-
- go func() {
- time.Sleep(time.Second * 3)
-
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- if err != nil {
- errCh <- errors.E(errors.Serve, err)
- return
- }
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- var ret string
- err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret)
- if err != nil {
- errCh <- err
- return
- }
- if ret != "Hello, username: Valery" {
- errCh <- errors.E("wrong response")
- return
- }
- // to stop exec
- errCh <- errors.E(errors.Disabled)
- return
- }()
-
- return errCh
-}
-
-func (p2 *Plugin2) Stop() error {
- return nil
-}
diff --git a/plugins/rpc/tests/rpc_test.go b/plugins/rpc/tests/rpc_test.go
deleted file mode 100644
index 0344da6b..00000000
--- a/plugins/rpc/tests/rpc_test.go
+++ /dev/null
@@ -1,169 +0,0 @@
-package tests
-
-import (
- "os"
- "os/signal"
- "syscall"
- "testing"
- "time"
-
- "github.com/spiral/endure"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-// graph https://bit.ly/3ensdNb
-func TestRpcInit(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&Plugin1{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&Plugin2{})
- if err != nil {
- t.Fatal(err)
- }
-
- v := &config.Viper{}
- v.Path = ".rr.yaml"
- v.Prefix = "rr"
- err = cont.Register(v)
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&rpc.Plugin{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&logger.ZapLogger{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
-
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
- tt := time.NewTimer(time.Second * 10)
-
- for {
- select {
- case e := <-ch:
- // just stop, this is ok
- if errors.Is(errors.Disabled, e.Error) {
- return
- }
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-tt.C:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- assert.Fail(t, "timeout")
- }
- }
-}
-
-// graph https://bit.ly/3ensdNb
-func TestRpcDisabled(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&Plugin1{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&Plugin2{})
- if err != nil {
- t.Fatal(err)
- }
-
- v := &config.Viper{}
- v.Path = ".rr-rpc-disabled.yaml"
- v.Prefix = "rr"
- err = cont.Register(v)
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&rpc.Plugin{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Register(&logger.ZapLogger{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
-
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
- tt := time.NewTimer(time.Second * 20)
-
- for {
- select {
- case e := <-ch:
- // RPC is turned off, should be and dial error
- if errors.Is(errors.Disabled, e.Error) {
- assert.FailNow(t, "should not be disabled error")
- }
- assert.Error(t, e.Error)
- err = cont.Stop()
- assert.Error(t, err)
- return
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-tt.C:
- // timeout
- return
- }
- }
-}