diff options
-rwxr-xr-x | .github/workflows/ci-build.yml | 4 | ||||
-rw-r--r-- | Makefile | 4 | ||||
-rwxr-xr-x | go.mod | 1 | ||||
-rw-r--r-- | interfaces/informer/interface.go | 7 | ||||
-rw-r--r-- | interfaces/resetter/interface.go | 5 | ||||
-rw-r--r-- | plugins/informer/plugin.go | 56 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 53 | ||||
-rw-r--r-- | plugins/informer/tests/.rr-informer.yaml | 13 | ||||
-rw-r--r-- | plugins/informer/tests/informer_test.go | 97 | ||||
-rw-r--r-- | plugins/informer/tests/test_plugin.go | 58 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 31 | ||||
-rw-r--r-- | plugins/resetter/plugin.go | 54 | ||||
-rw-r--r-- | plugins/resetter/rpc.go | 30 | ||||
-rw-r--r-- | plugins/resetter/tests/.rr-resetter.yaml | 13 | ||||
-rw-r--r-- | plugins/resetter/tests/resetter_test.go | 101 | ||||
-rw-r--r-- | plugins/resetter/tests/test_plugin.go | 66 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 2 |
17 files changed, 568 insertions, 27 deletions
diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index 06e260ce..8b462f81 100755 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -72,12 +72,14 @@ jobs: go test -v -race ./plugins/logger/tests -tags=debug -coverprofile=logger.txt -covermode=atomic go test -v -race ./plugins/server/tests -tags=debug -coverprofile=server.txt -covermode=atomic go test -v -race ./plugins/metrics/tests -tags=debug -coverprofile=metrics.txt -covermode=atomic + go test -v -race ./plugins/informer/tests -tags=debug -coverprofile=informer.txt -covermode=atomic + go test -v -race ./plugins/resetter/tests -tags=debug -coverprofile=informer.txt -covermode=atomic - name: Run code coverage uses: codecov/codecov-action@v1 with: token: ${{ secrets.CODECOV_TOKEN }} - files: lib.txt, rpc_config.txt, rpc.txt, plugin_config.txt, logger.txt, server.txt, metrics.txt + files: lib.txt, rpc_config.txt, rpc.txt, plugin_config.txt, logger.txt, server.txt, metrics.txt, informer.txt flags: unittests name: codecov-umbrella fail_ci_if_error: false @@ -5,4 +5,6 @@ test: go test -v -race -cover ./plugins/config/tests -tags=debug go test -v -race -cover ./plugins/server/tests -tags=debug go test -v -race -cover ./plugins/logger/tests -tags=debug - go test -v -race -cover ./plugins/metrics/tests -tags=debug
\ No newline at end of file + go test -v -race -cover ./plugins/metrics/tests -tags=debug + go test -v -race -cover ./plugins/informer/tests -tags=debug + go test -v -race -cover ./plugins/resetter/tests -tags=debug
\ No newline at end of file @@ -15,7 +15,6 @@ require ( github.com/spiral/endure v1.0.0-beta19 github.com/spiral/errors v1.0.4 github.com/spiral/goridge/v2 v2.4.6 - github.com/spiral/roadrunner v1.8.4 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a go.uber.org/multierr v1.6.0 diff --git a/interfaces/informer/interface.go b/interfaces/informer/interface.go new file mode 100644 index 00000000..42e82338 --- /dev/null +++ b/interfaces/informer/interface.go @@ -0,0 +1,7 @@ +package informer + +import "github.com/spiral/roadrunner/v2" + +type Informer interface { + Workers() []roadrunner.WorkerBase +} diff --git a/interfaces/resetter/interface.go b/interfaces/resetter/interface.go new file mode 100644 index 00000000..3fa48cf3 --- /dev/null +++ b/interfaces/resetter/interface.go @@ -0,0 +1,5 @@ +package resetter + +type Resetter interface { + Reset() error +} diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go new file mode 100644 index 00000000..09d933fd --- /dev/null +++ b/plugins/informer/plugin.go @@ -0,0 +1,56 @@ +package informer + +import ( + "github.com/spiral/endure" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/informer" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +const PluginName = "informer" + +type Plugin struct { + registry map[string]informer.Informer + log log.Logger +} + +func (p *Plugin) Init(log log.Logger) error { + p.registry = make(map[string]informer.Informer) + p.log = log + return nil +} + +// Reset named service. +func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) { + const op = errors.Op("get workers") + svc, ok := p.registry[name] + if !ok { + return nil, errors.E(op, errors.Errorf("no such service: %s", name)) + } + + return svc.Workers(), nil +} + +// CollectTarget resettable service. +func (p *Plugin) CollectTarget(name endure.Named, r informer.Informer) error { + p.registry[name.Name()] = r + return nil +} + +// Collects declares services to be collected. +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectTarget, + } +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// RPCService returns associated rpc service. +func (p *Plugin) RPC() interface{} { + return &rpc{srv: p, log: p.log} +} diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go new file mode 100644 index 00000000..de47a739 --- /dev/null +++ b/plugins/informer/rpc.go @@ -0,0 +1,53 @@ +package informer + +import ( + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +type rpc struct { + srv *Plugin + log log.Logger +} + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []roadrunner.ProcessState `json:"workers"` +} + +// List all resettable services. +func (rpc *rpc) List(_ bool, list *[]string) error { + rpc.log.Info("Started List method") + *list = make([]string, 0, len(rpc.srv.registry)) + + for name := range rpc.srv.registry { + *list = append(*list, name) + } + rpc.log.Debug("list of services", "list", *list) + + rpc.log.Info("successfully finished List method") + return nil +} + +// Workers state of a given service. +func (rpc *rpc) Workers(service string, list *WorkerList) error { + rpc.log.Info("started Workers method", "service", service) + workers, err := rpc.srv.Workers(service) + if err != nil { + return err + } + + list.Workers = make([]roadrunner.ProcessState, 0) + for _, w := range workers { + ps, err := roadrunner.WorkerProcessState(w) + if err != nil { + continue + } + + list.Workers = append(list.Workers, ps) + } + rpc.log.Debug("list of workers", "workers", list.Workers) + rpc.log.Info("successfully finished Workers method") + return nil +} diff --git a/plugins/informer/tests/.rr-informer.yaml b/plugins/informer/tests/.rr-informer.yaml new file mode 100644 index 00000000..83ecd582 --- /dev/null +++ b/plugins/informer/tests/.rr-informer.yaml @@ -0,0 +1,13 @@ +server: + command: "php ../../../tests/client.php echo pipes" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s" + +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false
\ No newline at end of file diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go new file mode 100644 index 00000000..5f221305 --- /dev/null +++ b/plugins/informer/tests/informer_test.go @@ -0,0 +1,97 @@ +package tests + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestInformerInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{ + Path: ".rr-informer.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &logger.ZapLogger{}, + &informer.Plugin{}, + &rpcPlugin.Plugin{}, + &Plugin1{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Second * 15) + + t.Run("InformerRpcTest", informerRpcTest) + + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } +} + +func informerRpcTest(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + // WorkerList contains list of workers. + list := struct { + // Workers is list of workers. + Workers []roadrunner.ProcessState `json:"workers"` + }{} + + err = client.Call("informer.Workers", "informer.plugin1", &list) + assert.NoError(t, err) + assert.Len(t, list.Workers, 10) +} diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go new file mode 100644 index 00000000..473b6de7 --- /dev/null +++ b/plugins/informer/tests/test_plugin.go @@ -0,0 +1,58 @@ +package tests + +import ( + "context" + "time" + + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +var testPoolConfig = roadrunner.PoolConfig{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: 10, + MaxWorkerMemory: 1000, + }, +} + +// Gauge ////////////// +type Plugin1 struct { + config config.Configurer + server server.Server +} + +func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error { + p1.config = cfg + p1.server = server + return nil +} + +func (p1 *Plugin1) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (p1 *Plugin1) Stop() error { + return nil +} + +func (p1 *Plugin1) Name() string { + return "informer.plugin1" +} + +func (p1 *Plugin1) Workers() []roadrunner.WorkerBase { + pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + panic(err) + } + + return pool.Workers() +} diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go index ed5d085a..1d0796b3 100644 --- a/plugins/metrics/tests/metrics_test.go +++ b/plugins/metrics/tests/metrics_test.go @@ -54,29 +54,14 @@ func TestMetricsInit(t *testing.T) { cfg.Prefix = "rr" cfg.Path = ".rr-test.yaml" - err = cont.Register(cfg) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&metrics.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&rpcPlugin.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - err = cont.Register(&Plugin1{}) - if err != nil { - t.Fatal(err) - } + err = cont.RegisterAll( + cfg, + &metrics.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &Plugin1{}, + ) + assert.NoError(t, err) err = cont.Init() if err != nil { diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go new file mode 100644 index 00000000..99e02aef --- /dev/null +++ b/plugins/resetter/plugin.go @@ -0,0 +1,54 @@ +package resetter + +import ( + "github.com/spiral/endure" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/resetter" +) + +const PluginName = "resetter" + +type Plugin struct { + registry map[string]resetter.Resetter + log log.Logger +} + +func (p *Plugin) Init(log log.Logger) error { + p.registry = make(map[string]resetter.Resetter) + p.log = log + return nil +} + +// Reset named service. +func (p *Plugin) Reset(name string) error { + svc, ok := p.registry[name] + if !ok { + return errors.E("no such service", errors.Str(name)) + } + + return svc.Reset() +} + +// RegisterTarget resettable service. +func (p *Plugin) RegisterTarget(name endure.Named, r resetter.Resetter) error { + p.registry[name.Name()] = r + return nil +} + +// Collects declares services to be collected. +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.RegisterTarget, + } +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// RPCService returns associated rpc service. +func (p *Plugin) RPC() interface{} { + return &rpc{srv: p, log: p.log} +} diff --git a/plugins/resetter/rpc.go b/plugins/resetter/rpc.go new file mode 100644 index 00000000..157aa8c6 --- /dev/null +++ b/plugins/resetter/rpc.go @@ -0,0 +1,30 @@ +package resetter + +import "github.com/spiral/roadrunner/v2/interfaces/log" + +type rpc struct { + srv *Plugin + log log.Logger +} + +// List all resettable services. +func (rpc *rpc) List(_ bool, list *[]string) error { + rpc.log.Info("started List method") + *list = make([]string, 0) + + for name := range rpc.srv.registry { + *list = append(*list, name) + } + rpc.log.Debug("services list", "services", *list) + + rpc.log.Info("finished List method") + return nil +} + +// Reset named service. +func (rpc *rpc) Reset(service string, done *bool) error { + rpc.log.Info("started Reset method for the service", "service", service) + defer rpc.log.Info("finished Reset method for the service", "service", service) + *done = true + return rpc.srv.Reset(service) +} diff --git a/plugins/resetter/tests/.rr-resetter.yaml b/plugins/resetter/tests/.rr-resetter.yaml new file mode 100644 index 00000000..83ecd582 --- /dev/null +++ b/plugins/resetter/tests/.rr-resetter.yaml @@ -0,0 +1,13 @@ +server: + command: "php ../../../tests/client.php echo pipes" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s" + +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false
\ No newline at end of file diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go new file mode 100644 index 00000000..ff5a7847 --- /dev/null +++ b/plugins/resetter/tests/resetter_test.go @@ -0,0 +1,101 @@ +package tests + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestInformerInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{ + Path: ".rr-resetter.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &logger.ZapLogger{}, + &resetter.Plugin{}, + &rpcPlugin.Plugin{}, + &Plugin1{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Second * 15) + + t.Run("InformerRpcTest", resetterRpcTest) + + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } +} + +func resetterRpcTest(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + // WorkerList contains list of workers. + + var ret bool + err = client.Call("resetter.Reset", "resetter.plugin1", &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + var services []string + err = client.Call("resetter.List", nil, &services) + assert.NoError(t, err) + if services[0] != "resetter.plugin1" { + t.Fatal("no enough services") + } +} diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go new file mode 100644 index 00000000..9f48a43f --- /dev/null +++ b/plugins/resetter/tests/test_plugin.go @@ -0,0 +1,66 @@ +package tests + +import ( + "context" + "time" + + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +var testPoolConfig = roadrunner.PoolConfig{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: 10, + MaxWorkerMemory: 1000, + }, +} + +// Gauge ////////////// +type Plugin1 struct { + config config.Configurer + server server.Server +} + +func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error { + p1.config = cfg + p1.server = server + return nil +} + +func (p1 *Plugin1) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (p1 *Plugin1) Stop() error { + return nil +} + +func (p1 *Plugin1) Name() string { + return "resetter.plugin1" +} + +func (p1 *Plugin1) Reset() error { + pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + panic(err) + } + pool.Destroy(context.Background()) + + pool, err = p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + panic(err) + } + + _ = pool + + return nil +} diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index 4d31138e..fbd37e12 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -11,7 +11,7 @@ import ( plugin "github.com/spiral/roadrunner/v2/plugins/server" ) -const ConfigSection = "app" +const ConfigSection = "server" const Response = "test" var testPoolConfig = roadrunner.PoolConfig{ |