From baa12b092578d41218585d918fb7e1425700272d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 19 Apr 2021 16:42:28 +0300 Subject: - Add tests, update Informer implementation Signed-off-by: Valery Piashchynski --- plugins/http/plugin.go | 27 ++++++++++++++++++++++----- plugins/informer/interface.go | 6 ++++-- plugins/informer/plugin.go | 6 +++--- plugins/informer/rpc.go | 15 ++++----------- plugins/service/config.go | 14 +++++++------- plugins/service/interface.go | 1 - plugins/service/plugin.go | 20 ++++++++++++++++++-- plugins/service/process.go | 34 +++++++++++++++++++--------------- plugins/static/plugin.go | 2 +- 9 files changed, 78 insertions(+), 47 deletions(-) delete mode 100644 plugins/service/interface.go (limited to 'plugins') diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 86fcb329..8c8a86b4 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -17,6 +17,7 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -332,8 +333,24 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.RUnlock() } -// Workers returns associated pool workers -func (s *Plugin) Workers() []worker.BaseProcess { +// Workers returns slice with the process states for the workers +func (s *Plugin) Workers() []process.State { + workers := s.pool.Workers() + + ps := make([]process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +// internal +func (s *Plugin) workers() []worker.BaseProcess { return s.pool.Workers() } @@ -395,7 +412,7 @@ func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { // Status return status of the particular plugin func (s *Plugin) Status() status.Status { - workers := s.Workers() + workers := s.workers() for i := 0; i < len(workers); i++ { if workers[i].State().IsActive() { return status.Status{ @@ -409,9 +426,9 @@ func (s *Plugin) Status() status.Status { } } -// Status return status of the particular plugin +// Ready return readiness status of the particular plugin func (s *Plugin) Ready() status.Status { - workers := s.Workers() + workers := s.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) // we assume, that plugin's worker pool is ready diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index 8e3b922b..45f44691 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -1,8 +1,10 @@ package informer -import "github.com/spiral/roadrunner/v2/pkg/worker" +import ( + "github.com/spiral/roadrunner/v2/pkg/process" +) // Informer used to get workers from particular plugin or set of plugins type Informer interface { - Workers() []worker.BaseProcess + Workers() []process.State } diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index 416c0112..98081d34 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -3,7 +3,7 @@ package informer import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -21,7 +21,7 @@ func (p *Plugin) Init(log logger.Logger) error { } // Workers provides BaseProcess slice with workers for the requested plugin -func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) { +func (p *Plugin) Workers(name string) ([]process.State, error) { const op = errors.Op("informer_plugin_workers") svc, ok := p.registry[name] if !ok { @@ -49,7 +49,7 @@ func (p *Plugin) Name() string { return PluginName } -// RPCService returns associated rpc service. +// RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { return &rpc{srv: p, log: p.log} } diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 55a9832b..b0b5b1af 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,9 +1,8 @@ package informer import ( - "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/tools" ) type rpc struct { @@ -14,7 +13,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { // Workers is list of workers. - Workers []tools.ProcessState `json:"workers"` + Workers []process.State `json:"workers"` } // List all resettable services. @@ -38,15 +37,9 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error { return err } - list.Workers = make([]tools.ProcessState, 0) - for _, w := range workers { - ps, err := tools.WorkerProcessState(w.(worker.BaseProcess)) - if err != nil { - continue - } + // write actual processes + list.Workers = workers - list.Workers = append(list.Workers, ps) - } rpc.log.Debug("list of workers", "workers", list.Workers) rpc.log.Debug("successfully finished Workers method") return nil diff --git a/plugins/service/config.go b/plugins/service/config.go index b1099e06..871c8f76 100644 --- a/plugins/service/config.go +++ b/plugins/service/config.go @@ -4,11 +4,11 @@ import "time" // Service represents particular service configuration type Service struct { - Command string `mapstructure:"command"` - ProcessNum int `mapstructure:"process_num"` - ExecTimeout time.Duration `mapstructure:"exec_timeout"` - RestartAfterExit bool `mapstructure:"restart_after_exit"` - RestartDelay time.Duration `mapstructure:"restart_delay"` + Command string `mapstructure:"command"` + ProcessNum int `mapstructure:"process_num"` + ExecTimeout time.Duration `mapstructure:"exec_timeout"` + RemainAfterExit bool `mapstructure:"remain_after_exit"` + RestartSec uint64 `mapstructure:"restart_sec"` } // Config for the services @@ -24,9 +24,9 @@ func (c *Config) InitDefault() { val.ProcessNum = 1 c.Services[k] = val } - if v.RestartDelay == 0 { + if v.RestartSec == 0 { val := c.Services[k] - val.RestartDelay = time.Minute + val.RestartSec = 30 c.Services[k] = val } } diff --git a/plugins/service/interface.go b/plugins/service/interface.go deleted file mode 100644 index 6d43c336..00000000 --- a/plugins/service/interface.go +++ /dev/null @@ -1 +0,0 @@ -package service diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go index 60ed46c3..91e47e86 100644 --- a/plugins/service/plugin.go +++ b/plugins/service/plugin.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -54,9 +55,9 @@ func (service *Plugin) Serve() chan error { for i := 0; i < service.cfg.Services[k].ProcessNum; i++ { // create processor structure, which will process all the services service.processes = append(service.processes, NewServiceProcess( - service.cfg.Services[k].RestartAfterExit, + service.cfg.Services[k].RemainAfterExit, service.cfg.Services[k].ExecTimeout, - service.cfg.Services[k].RestartDelay, + service.cfg.Services[k].RestartSec, service.cfg.Services[k].Command, service.logger, errCh, @@ -64,6 +65,7 @@ func (service *Plugin) Serve() chan error { } } + // start all processes for i := 0; i < len(service.processes); i++ { service.processes[i].start() } @@ -72,6 +74,20 @@ func (service *Plugin) Serve() chan error { return errCh } +func (service *Plugin) Workers() []process.State { + service.Lock() + defer service.Unlock() + states := make([]process.State, 0, len(service.processes)) + for i := 0; i < len(service.processes); i++ { + st, err := process.GeneralProcessState(service.processes[i].Pid) + if err != nil { + continue + } + states = append(states, st) + } + return states +} + func (service *Plugin) Stop() error { service.Lock() defer service.Unlock() diff --git a/plugins/service/process.go b/plugins/service/process.go index 06d0b4c2..49219eb0 100644 --- a/plugins/service/process.go +++ b/plugins/service/process.go @@ -20,15 +20,16 @@ type Process struct { command *exec.Cmd // rawCmd from the plugin rawCmd string + Pid int // root plugin error chan errCh chan error // logger log logger.Logger - ExecTimeout time.Duration - RestartAfterExit bool - RestartDelay time.Duration + ExecTimeout time.Duration + RemainAfterExit bool + RestartSec uint64 // process start time startTime time.Time @@ -36,14 +37,14 @@ type Process struct { } // NewServiceProcess constructs service process structure -func NewServiceProcess(restartAfterExit bool, execTimeout, restartDelay time.Duration, command string, l logger.Logger, errCh chan error) *Process { +func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process { return &Process{ - rawCmd: command, - RestartDelay: restartDelay, - ExecTimeout: execTimeout, - RestartAfterExit: restartAfterExit, - errCh: errCh, - log: l, + rawCmd: command, + RestartSec: restartDelay, + ExecTimeout: execTimeout, + RemainAfterExit: restartAfterExit, + errCh: errCh, + log: l, } } @@ -70,10 +71,11 @@ func (p *Process) start() { // start process waiting routine go p.wait() - // startExec + // execHandler checks for the execTimeout go p.execHandler() // save start time p.startTime = time.Now() + p.Pid = p.command.Process.Pid } // create command for the process @@ -93,12 +95,14 @@ func (p *Process) createProcess() { // wait process for exit func (p *Process) wait() { // Wait error doesn't matter here - _ = p.command.Wait() - + err := p.command.Wait() + if err != nil { + p.log.Error("process wait error", "error", err) + } // wait for restart delay - if p.RestartAfterExit { + if p.RemainAfterExit { // wait for the delay - time.Sleep(p.RestartDelay) + time.Sleep(time.Second * time.Duration(p.RestartSec)) // and start command again p.start() } diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go index 5f108701..76cb9e68 100644 --- a/plugins/static/plugin.go +++ b/plugins/static/plugin.go @@ -57,7 +57,7 @@ func (s *Plugin) Name() string { return PluginName } -// middleware must return true if request/response pair is handled within the middleware. +// Middleware must return true if request/response pair is handled within the middleware. func (s *Plugin) Middleware(next http.Handler) http.HandlerFunc { // Define the http.HandlerFunc return func(w http.ResponseWriter, r *http.Request) { -- cgit v1.2.3