summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-19 16:42:28 +0300
committerValery Piashchynski <[email protected]>2021-04-19 16:42:28 +0300
commitbaa12b092578d41218585d918fb7e1425700272d (patch)
tree91881bd0ac32c609ea01fafe3bbc15a13a67c392 /plugins
parent112b7b60bbc045f4935e1766be9d2266abf68b31 (diff)
- Add tests, update Informer implementation
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/plugin.go27
-rw-r--r--plugins/informer/interface.go6
-rw-r--r--plugins/informer/plugin.go6
-rw-r--r--plugins/informer/rpc.go15
-rw-r--r--plugins/service/config.go14
-rw-r--r--plugins/service/interface.go1
-rw-r--r--plugins/service/plugin.go20
-rw-r--r--plugins/service/process.go34
-rw-r--r--plugins/static/plugin.go2
9 files changed, 78 insertions, 47 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 86fcb329..8c8a86b4 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -17,6 +17,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -332,8 +333,24 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RUnlock()
}
-// Workers returns associated pool workers
-func (s *Plugin) Workers() []worker.BaseProcess {
+// Workers returns slice with the process states for the workers
+func (s *Plugin) Workers() []process.State {
+ workers := s.pool.Workers()
+
+ ps := make([]process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+// internal
+func (s *Plugin) workers() []worker.BaseProcess {
return s.pool.Workers()
}
@@ -395,7 +412,7 @@ func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
// Status return status of the particular plugin
func (s *Plugin) Status() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -409,9 +426,9 @@ func (s *Plugin) Status() status.Status {
}
}
-// Status return status of the particular plugin
+// Ready return readiness status of the particular plugin
func (s *Plugin) Ready() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
// we assume, that plugin's worker pool is ready
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 8e3b922b..45f44691 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,8 +1,10 @@
package informer
-import "github.com/spiral/roadrunner/v2/pkg/worker"
+import (
+ "github.com/spiral/roadrunner/v2/pkg/process"
+)
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []worker.BaseProcess
+ Workers() []process.State
}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 416c0112..98081d34 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,7 +3,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,7 @@ func (p *Plugin) Init(log logger.Logger) error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
+func (p *Plugin) Workers(name string) ([]process.State, error) {
const op = errors.Op("informer_plugin_workers")
svc, ok := p.registry[name]
if !ok {
@@ -49,7 +49,7 @@ func (p *Plugin) Name() string {
return PluginName
}
-// RPCService returns associated rpc service.
+// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
return &rpc{srv: p, log: p.log}
}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 55a9832b..b0b5b1af 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,9 +1,8 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/tools"
)
type rpc struct {
@@ -14,7 +13,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}
// List all resettable services.
@@ -38,15 +37,9 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return err
}
- list.Workers = make([]tools.ProcessState, 0)
- for _, w := range workers {
- ps, err := tools.WorkerProcessState(w.(worker.BaseProcess))
- if err != nil {
- continue
- }
+ // write actual processes
+ list.Workers = workers
- list.Workers = append(list.Workers, ps)
- }
rpc.log.Debug("list of workers", "workers", list.Workers)
rpc.log.Debug("successfully finished Workers method")
return nil
diff --git a/plugins/service/config.go b/plugins/service/config.go
index b1099e06..871c8f76 100644
--- a/plugins/service/config.go
+++ b/plugins/service/config.go
@@ -4,11 +4,11 @@ import "time"
// Service represents particular service configuration
type Service struct {
- Command string `mapstructure:"command"`
- ProcessNum int `mapstructure:"process_num"`
- ExecTimeout time.Duration `mapstructure:"exec_timeout"`
- RestartAfterExit bool `mapstructure:"restart_after_exit"`
- RestartDelay time.Duration `mapstructure:"restart_delay"`
+ Command string `mapstructure:"command"`
+ ProcessNum int `mapstructure:"process_num"`
+ ExecTimeout time.Duration `mapstructure:"exec_timeout"`
+ RemainAfterExit bool `mapstructure:"remain_after_exit"`
+ RestartSec uint64 `mapstructure:"restart_sec"`
}
// Config for the services
@@ -24,9 +24,9 @@ func (c *Config) InitDefault() {
val.ProcessNum = 1
c.Services[k] = val
}
- if v.RestartDelay == 0 {
+ if v.RestartSec == 0 {
val := c.Services[k]
- val.RestartDelay = time.Minute
+ val.RestartSec = 30
c.Services[k] = val
}
}
diff --git a/plugins/service/interface.go b/plugins/service/interface.go
deleted file mode 100644
index 6d43c336..00000000
--- a/plugins/service/interface.go
+++ /dev/null
@@ -1 +0,0 @@
-package service
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 60ed46c3..91e47e86 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,6 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -54,9 +55,9 @@ func (service *Plugin) Serve() chan error {
for i := 0; i < service.cfg.Services[k].ProcessNum; i++ {
// create processor structure, which will process all the services
service.processes = append(service.processes, NewServiceProcess(
- service.cfg.Services[k].RestartAfterExit,
+ service.cfg.Services[k].RemainAfterExit,
service.cfg.Services[k].ExecTimeout,
- service.cfg.Services[k].RestartDelay,
+ service.cfg.Services[k].RestartSec,
service.cfg.Services[k].Command,
service.logger,
errCh,
@@ -64,6 +65,7 @@ func (service *Plugin) Serve() chan error {
}
}
+ // start all processes
for i := 0; i < len(service.processes); i++ {
service.processes[i].start()
}
@@ -72,6 +74,20 @@ func (service *Plugin) Serve() chan error {
return errCh
}
+func (service *Plugin) Workers() []process.State {
+ service.Lock()
+ defer service.Unlock()
+ states := make([]process.State, 0, len(service.processes))
+ for i := 0; i < len(service.processes); i++ {
+ st, err := process.GeneralProcessState(service.processes[i].Pid)
+ if err != nil {
+ continue
+ }
+ states = append(states, st)
+ }
+ return states
+}
+
func (service *Plugin) Stop() error {
service.Lock()
defer service.Unlock()
diff --git a/plugins/service/process.go b/plugins/service/process.go
index 06d0b4c2..49219eb0 100644
--- a/plugins/service/process.go
+++ b/plugins/service/process.go
@@ -20,15 +20,16 @@ type Process struct {
command *exec.Cmd
// rawCmd from the plugin
rawCmd string
+ Pid int
// root plugin error chan
errCh chan error
// logger
log logger.Logger
- ExecTimeout time.Duration
- RestartAfterExit bool
- RestartDelay time.Duration
+ ExecTimeout time.Duration
+ RemainAfterExit bool
+ RestartSec uint64
// process start time
startTime time.Time
@@ -36,14 +37,14 @@ type Process struct {
}
// NewServiceProcess constructs service process structure
-func NewServiceProcess(restartAfterExit bool, execTimeout, restartDelay time.Duration, command string, l logger.Logger, errCh chan error) *Process {
+func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process {
return &Process{
- rawCmd: command,
- RestartDelay: restartDelay,
- ExecTimeout: execTimeout,
- RestartAfterExit: restartAfterExit,
- errCh: errCh,
- log: l,
+ rawCmd: command,
+ RestartSec: restartDelay,
+ ExecTimeout: execTimeout,
+ RemainAfterExit: restartAfterExit,
+ errCh: errCh,
+ log: l,
}
}
@@ -70,10 +71,11 @@ func (p *Process) start() {
// start process waiting routine
go p.wait()
- // startExec
+ // execHandler checks for the execTimeout
go p.execHandler()
// save start time
p.startTime = time.Now()
+ p.Pid = p.command.Process.Pid
}
// create command for the process
@@ -93,12 +95,14 @@ func (p *Process) createProcess() {
// wait process for exit
func (p *Process) wait() {
// Wait error doesn't matter here
- _ = p.command.Wait()
-
+ err := p.command.Wait()
+ if err != nil {
+ p.log.Error("process wait error", "error", err)
+ }
// wait for restart delay
- if p.RestartAfterExit {
+ if p.RemainAfterExit {
// wait for the delay
- time.Sleep(p.RestartDelay)
+ time.Sleep(time.Second * time.Duration(p.RestartSec))
// and start command again
p.start()
}
diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go
index 5f108701..76cb9e68 100644
--- a/plugins/static/plugin.go
+++ b/plugins/static/plugin.go
@@ -57,7 +57,7 @@ func (s *Plugin) Name() string {
return PluginName
}
-// middleware must return true if request/response pair is handled within the middleware.
+// Middleware must return true if request/response pair is handled within the middleware.
func (s *Plugin) Middleware(next http.Handler) http.HandlerFunc {
// Define the http.HandlerFunc
return func(w http.ResponseWriter, r *http.Request) {