summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linux.yml1
-rwxr-xr-xMakefile64
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go4
-rw-r--r--pkg/process/state.go (renamed from tools/process.go)39
-rw-r--r--pkg/transport/interface.go2
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/state.go10
-rwxr-xr-xpkg/worker/worker.go6
-rw-r--r--plugins/http/plugin.go27
-rw-r--r--plugins/informer/interface.go6
-rw-r--r--plugins/informer/plugin.go6
-rw-r--r--plugins/informer/rpc.go15
-rw-r--r--plugins/service/config.go14
-rw-r--r--plugins/service/interface.go1
-rw-r--r--plugins/service/plugin.go20
-rw-r--r--plugins/service/process.go34
-rw-r--r--plugins/static/plugin.go2
-rw-r--r--tests/plugins/http/http_plugin_test.go8
-rw-r--r--tests/plugins/informer/informer_test.go4
-rw-r--r--tests/plugins/informer/test_plugin.go16
-rw-r--r--tests/plugins/service/configs/.rr-service-error.yaml16
-rw-r--r--tests/plugins/service/configs/.rr-service-init.yaml8
-rw-r--r--tests/plugins/service/configs/.rr-service-restarts.yaml16
-rw-r--r--tests/plugins/service/service_plugin_test.go180
-rw-r--r--tools/worker_table.go45
26 files changed, 396 insertions, 156 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index a478726d..9d59cd53 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -66,6 +66,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server.txt -covermode=atomic ./tests/plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/service.txt -covermode=atomic ./tests/plugins/service
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/status.txt -covermode=atomic ./tests/plugins/status
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/config.txt -covermode=atomic ./tests/plugins/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/gzip.txt -covermode=atomic ./tests/plugins/gzip
diff --git a/Makefile b/Makefile
index 84735f67..3ff16e7a 100755
--- a/Makefile
+++ b/Makefile
@@ -36,6 +36,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server.out -covermode=atomic ./tests/plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/service.out -covermode=atomic ./tests/plugins/service
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/status.out -covermode=atomic ./tests/plugins/status
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/config.out -covermode=atomic ./tests/plugins/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/gzip.out -covermode=atomic ./tests/plugins/gzip
@@ -68,6 +69,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
go test -v -race -tags=debug ./tests/plugins/server
+ go test -v -race -tags=debug ./tests/plugins/service
go test -v -race -tags=debug ./tests/plugins/status
go test -v -race -tags=debug ./tests/plugins/config
go test -v -race -tags=debug ./tests/plugins/gzip
@@ -85,65 +87,3 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/kv/memory
go test -v -race -tags=debug ./tests/plugins/kv/memcached
docker-compose -f tests/docker-compose.yaml down
-
-test_1.14: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
- go1.14.14 test -v -race -tags=debug ./pkg/transport/pipe
- go1.14.14 test -v -race -tags=debug ./pkg/transport/socket
- go1.14.14 test -v -race -tags=debug ./pkg/pool
- go1.14.14 test -v -race -tags=debug ./pkg/worker
- go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher
- go1.14.14 test -v -race -tags=debug ./tests/plugins/http
- go1.14.14 test -v -race -tags=debug ./plugins/http/config
- go1.14.14 test -v -race -tags=debug ./tests/plugins/informer
- go1.14.14 test -v -race -tags=debug ./tests/plugins/reload
- go1.14.14 test -v -race -tags=debug ./tests/plugins/server
- go1.14.14 test -v -race -tags=debug ./tests/plugins/status
- go1.14.14 test -v -race -tags=debug ./tests/plugins/config
- go1.14.14 test -v -race -tags=debug ./tests/plugins/gzip
- go1.14.14 test -v -race -tags=debug ./tests/plugins/headers
- go1.14.14 test -v -race -tags=debug ./tests/plugins/logger
- go1.14.14 test -v -race -tags=debug ./tests/plugins/metrics
- go1.14.14 test -v -race -tags=debug ./tests/plugins/redis
- go1.14.14 test -v -race -tags=debug ./tests/plugins/resetter
- go1.14.14 test -v -race -tags=debug ./tests/plugins/rpc
- go1.14.14 test -v -race -tags=debug ./tests/plugins/static
- go1.14.14 test -v -race -tags=debug ./plugins/kv/boltdb
- go1.14.14 test -v -race -tags=debug ./plugins/kv/memory
- go1.14.14 test -v -race -tags=debug ./plugins/kv/memcached
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/boltdb
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memory
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached
- docker-compose -f tests/docker-compose.yaml down
-
-test_1.16: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
- go1.16rc1 test -v -race -tags=debug ./pkg/transport/pipe
- go1.16rc1 test -v -race -tags=debug ./pkg/transport/socket
- go1.16rc1 test -v -race -tags=debug ./pkg/pool
- go1.16rc1 test -v -race -tags=debug ./pkg/worker
- go1.16rc1 test -v -race -tags=debug ./pkg/worker_watcher
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/http
- go1.16rc1 test -v -race -tags=debug ./plugins/http/config
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/informer
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/reload
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/server
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/status
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/config
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/gzip
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/headers
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/logger
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/metrics
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/redis
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/resetter
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/rpc
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/static
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/boltdb
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/memory
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/memcached
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/boltdb
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memory
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memcached
- docker-compose -f tests/docker-compose.yaml down
-
-test_pipeline: test_1.14 test
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index c8e45b82..129c6f7d 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -119,7 +119,7 @@ func (sp *StaticPool) addListener(listener events.Listener) {
sp.events.AddListener(listener)
}
-// Config returns associated pool configuration. Immutable.
+// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
}
@@ -134,7 +134,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return nil
}
-// Be careful, sync Exec with ExecWithContext
+// Exec executes provided payload on the worker
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 273adc30..40903db3 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,8 +8,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/tools"
)
const MB = 1024 * 1024
@@ -176,7 +176,7 @@ func (sp *supervised) control() {
continue
}
- s, err := tools.WorkerProcessState(workers[i])
+ s, err := process.WorkerProcessState(workers[i])
if err != nil {
// worker not longer valid for supervision
continue
diff --git a/tools/process.go b/pkg/process/state.go
index a6eb1139..462dd47e 100644
--- a/tools/process.go
+++ b/pkg/process/state.go
@@ -1,4 +1,4 @@
-package tools
+package process
import (
"github.com/shirou/gopsutil/process"
@@ -6,8 +6,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
-// ProcessState provides information about specific worker.
-type ProcessState struct {
+// State provides information about specific worker.
+type State struct {
// Pid contains process id.
Pid int `json:"pid"`
@@ -23,18 +23,26 @@ type ProcessState struct {
// MemoryUsage holds the information about worker memory usage in bytes.
// Values might vary for different operating systems and based on RSS.
MemoryUsage uint64 `json:"memoryUsage"`
+
+ CPUPercent float64
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) {
+func WorkerProcessState(w worker.BaseProcess) (State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return ProcessState{}, errors.E(op, err)
+ return State{}, errors.E(op, err)
+ }
+
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
}
- return ProcessState{
+ return State{
+ CPUPercent: percent,
Pid: int(w.Pid()),
Status: w.State().String(),
NumJobs: w.State().NumExecs(),
@@ -42,3 +50,22 @@ func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) {
MemoryUsage: i.RSS,
}, nil
}
+
+func GeneralProcessState(pid int) (State, error) {
+ const op = errors.Op("process_state")
+ p, _ := process.NewProcess(int32(pid))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return State{}, errors.E(op, err)
+ }
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
+ }
+
+ return State{
+ CPUPercent: percent,
+ Pid: pid,
+ MemoryUsage: i.RSS,
+ }, nil
+}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
index 299ac95f..7e3e5350 100644
--- a/pkg/transport/interface.go
+++ b/pkg/transport/interface.go
@@ -10,7 +10,7 @@ import (
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
- // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 2b68717a..d2cfe2cd 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -16,7 +16,7 @@ type State interface {
Value() int64
// Set sets the StateImpl
Set(value int64)
- // NumJobs shows how many times WorkerProcess was invoked
+ // NumExecs shows how many times WorkerProcess was invoked
NumExecs() uint64
// IsActive returns true if WorkerProcess not Inactive or Stopped
IsActive() bool
@@ -69,6 +69,6 @@ type SyncWorker interface {
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
- // ExecWithContext used to handle Exec with TTL
+ // ExecWithTTL used to handle Exec with TTL
ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 502f8199..9c4543c8 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -24,10 +24,10 @@ const (
// StateKilling - process is being forcibly stopped
StateKilling
- // State of worker, when no need to allocate new one
+ // StateDestroyed State of worker, when no need to allocate new one
StateDestroyed
- // State of worker, when it reached executions limit
+ // StateMaxJobsReached State of worker, when it reached executions limit
StateMaxJobsReached
// StateStopped - process has been terminated.
@@ -91,17 +91,17 @@ func (s *StateImpl) IsActive() bool {
return val == StateWorking || val == StateReady
}
-// change StateImpl value (status)
+// Set change StateImpl value (status)
func (s *StateImpl) Set(value int64) {
atomic.StoreInt64(&s.value, value)
}
-// register new execution atomically
+// RegisterExec register new execution atomically
func (s *StateImpl) RegisterExec() {
atomic.AddUint64(&s.numExecs, 1)
}
-// Update last used time
+// SetLastUsed Update last used time
func (s *StateImpl) SetLastUsed(lu uint64) {
atomic.StoreUint64(&s.lastUsed, lu)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b04e1363..69c438b0 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -96,14 +96,12 @@ func (w *Process) State() State {
return w.state
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// AttachRelay attaches relay to the worker
func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// Relay returns relay attached to the worker
func (w *Process) Relay() relay.Relay {
return w.relay
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 86fcb329..8c8a86b4 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -17,6 +17,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -332,8 +333,24 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RUnlock()
}
-// Workers returns associated pool workers
-func (s *Plugin) Workers() []worker.BaseProcess {
+// Workers returns slice with the process states for the workers
+func (s *Plugin) Workers() []process.State {
+ workers := s.pool.Workers()
+
+ ps := make([]process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+// internal
+func (s *Plugin) workers() []worker.BaseProcess {
return s.pool.Workers()
}
@@ -395,7 +412,7 @@ func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
// Status return status of the particular plugin
func (s *Plugin) Status() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -409,9 +426,9 @@ func (s *Plugin) Status() status.Status {
}
}
-// Status return status of the particular plugin
+// Ready return readiness status of the particular plugin
func (s *Plugin) Ready() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
// we assume, that plugin's worker pool is ready
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 8e3b922b..45f44691 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,8 +1,10 @@
package informer
-import "github.com/spiral/roadrunner/v2/pkg/worker"
+import (
+ "github.com/spiral/roadrunner/v2/pkg/process"
+)
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []worker.BaseProcess
+ Workers() []process.State
}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 416c0112..98081d34 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,7 +3,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,7 @@ func (p *Plugin) Init(log logger.Logger) error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
+func (p *Plugin) Workers(name string) ([]process.State, error) {
const op = errors.Op("informer_plugin_workers")
svc, ok := p.registry[name]
if !ok {
@@ -49,7 +49,7 @@ func (p *Plugin) Name() string {
return PluginName
}
-// RPCService returns associated rpc service.
+// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
return &rpc{srv: p, log: p.log}
}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 55a9832b..b0b5b1af 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,9 +1,8 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/tools"
)
type rpc struct {
@@ -14,7 +13,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}
// List all resettable services.
@@ -38,15 +37,9 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return err
}
- list.Workers = make([]tools.ProcessState, 0)
- for _, w := range workers {
- ps, err := tools.WorkerProcessState(w.(worker.BaseProcess))
- if err != nil {
- continue
- }
+ // write actual processes
+ list.Workers = workers
- list.Workers = append(list.Workers, ps)
- }
rpc.log.Debug("list of workers", "workers", list.Workers)
rpc.log.Debug("successfully finished Workers method")
return nil
diff --git a/plugins/service/config.go b/plugins/service/config.go
index b1099e06..871c8f76 100644
--- a/plugins/service/config.go
+++ b/plugins/service/config.go
@@ -4,11 +4,11 @@ import "time"
// Service represents particular service configuration
type Service struct {
- Command string `mapstructure:"command"`
- ProcessNum int `mapstructure:"process_num"`
- ExecTimeout time.Duration `mapstructure:"exec_timeout"`
- RestartAfterExit bool `mapstructure:"restart_after_exit"`
- RestartDelay time.Duration `mapstructure:"restart_delay"`
+ Command string `mapstructure:"command"`
+ ProcessNum int `mapstructure:"process_num"`
+ ExecTimeout time.Duration `mapstructure:"exec_timeout"`
+ RemainAfterExit bool `mapstructure:"remain_after_exit"`
+ RestartSec uint64 `mapstructure:"restart_sec"`
}
// Config for the services
@@ -24,9 +24,9 @@ func (c *Config) InitDefault() {
val.ProcessNum = 1
c.Services[k] = val
}
- if v.RestartDelay == 0 {
+ if v.RestartSec == 0 {
val := c.Services[k]
- val.RestartDelay = time.Minute
+ val.RestartSec = 30
c.Services[k] = val
}
}
diff --git a/plugins/service/interface.go b/plugins/service/interface.go
deleted file mode 100644
index 6d43c336..00000000
--- a/plugins/service/interface.go
+++ /dev/null
@@ -1 +0,0 @@
-package service
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 60ed46c3..91e47e86 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,6 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -54,9 +55,9 @@ func (service *Plugin) Serve() chan error {
for i := 0; i < service.cfg.Services[k].ProcessNum; i++ {
// create processor structure, which will process all the services
service.processes = append(service.processes, NewServiceProcess(
- service.cfg.Services[k].RestartAfterExit,
+ service.cfg.Services[k].RemainAfterExit,
service.cfg.Services[k].ExecTimeout,
- service.cfg.Services[k].RestartDelay,
+ service.cfg.Services[k].RestartSec,
service.cfg.Services[k].Command,
service.logger,
errCh,
@@ -64,6 +65,7 @@ func (service *Plugin) Serve() chan error {
}
}
+ // start all processes
for i := 0; i < len(service.processes); i++ {
service.processes[i].start()
}
@@ -72,6 +74,20 @@ func (service *Plugin) Serve() chan error {
return errCh
}
+func (service *Plugin) Workers() []process.State {
+ service.Lock()
+ defer service.Unlock()
+ states := make([]process.State, 0, len(service.processes))
+ for i := 0; i < len(service.processes); i++ {
+ st, err := process.GeneralProcessState(service.processes[i].Pid)
+ if err != nil {
+ continue
+ }
+ states = append(states, st)
+ }
+ return states
+}
+
func (service *Plugin) Stop() error {
service.Lock()
defer service.Unlock()
diff --git a/plugins/service/process.go b/plugins/service/process.go
index 06d0b4c2..49219eb0 100644
--- a/plugins/service/process.go
+++ b/plugins/service/process.go
@@ -20,15 +20,16 @@ type Process struct {
command *exec.Cmd
// rawCmd from the plugin
rawCmd string
+ Pid int
// root plugin error chan
errCh chan error
// logger
log logger.Logger
- ExecTimeout time.Duration
- RestartAfterExit bool
- RestartDelay time.Duration
+ ExecTimeout time.Duration
+ RemainAfterExit bool
+ RestartSec uint64
// process start time
startTime time.Time
@@ -36,14 +37,14 @@ type Process struct {
}
// NewServiceProcess constructs service process structure
-func NewServiceProcess(restartAfterExit bool, execTimeout, restartDelay time.Duration, command string, l logger.Logger, errCh chan error) *Process {
+func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process {
return &Process{
- rawCmd: command,
- RestartDelay: restartDelay,
- ExecTimeout: execTimeout,
- RestartAfterExit: restartAfterExit,
- errCh: errCh,
- log: l,
+ rawCmd: command,
+ RestartSec: restartDelay,
+ ExecTimeout: execTimeout,
+ RemainAfterExit: restartAfterExit,
+ errCh: errCh,
+ log: l,
}
}
@@ -70,10 +71,11 @@ func (p *Process) start() {
// start process waiting routine
go p.wait()
- // startExec
+ // execHandler checks for the execTimeout
go p.execHandler()
// save start time
p.startTime = time.Now()
+ p.Pid = p.command.Process.Pid
}
// create command for the process
@@ -93,12 +95,14 @@ func (p *Process) createProcess() {
// wait process for exit
func (p *Process) wait() {
// Wait error doesn't matter here
- _ = p.command.Wait()
-
+ err := p.command.Wait()
+ if err != nil {
+ p.log.Error("process wait error", "error", err)
+ }
// wait for restart delay
- if p.RestartAfterExit {
+ if p.RemainAfterExit {
// wait for the delay
- time.Sleep(p.RestartDelay)
+ time.Sleep(time.Second * time.Duration(p.RestartSec))
// and start command again
p.start()
}
diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go
index 5f108701..76cb9e68 100644
--- a/plugins/static/plugin.go
+++ b/plugins/static/plugin.go
@@ -57,7 +57,7 @@ func (s *Plugin) Name() string {
return PluginName
}
-// middleware must return true if request/response pair is handled within the middleware.
+// Middleware must return true if request/response pair is handled within the middleware.
func (s *Plugin) Middleware(next http.Handler) http.HandlerFunc {
// Define the http.HandlerFunc
return func(w http.ResponseWriter, r *http.Request) {
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index 51c5fda2..73d6d102 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -21,13 +21,13 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/tests/mocks"
- "github.com/spiral/roadrunner/v2/tools"
"github.com/yookoala/gofast"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -288,7 +288,7 @@ func informerTest(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "http", &list)
@@ -1336,7 +1336,7 @@ func informerTestBefore(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "http", &list)
@@ -1353,7 +1353,7 @@ func informerTestAfter(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
assert.NotZero(t, workerPid)
diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go
index 31e14ff4..b6f50fd5 100644
--- a/tests/plugins/informer/informer_test.go
+++ b/tests/plugins/informer/informer_test.go
@@ -12,12 +12,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
@@ -100,7 +100,7 @@ func informerWorkersRPCTest(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "informer.plugin1", &list)
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 8a1fb933..0c9065a3 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -49,11 +49,21 @@ func (p1 *Plugin1) Name() string {
return "informer.plugin1"
}
-func (p1 *Plugin1) Workers() []worker.BaseProcess {
+func (p1 *Plugin1) Workers() []process.State {
p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
}
- return p.Workers()
+ ps := make([]process.State, 0, len(p.Workers()))
+ workers := p.Workers()
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
}
diff --git a/tests/plugins/service/configs/.rr-service-error.yaml b/tests/plugins/service/configs/.rr-service-error.yaml
new file mode 100644
index 00000000..3b0f1eb9
--- /dev/null
+++ b/tests/plugins/service/configs/.rr-service-error.yaml
@@ -0,0 +1,16 @@
+service:
+ some_service_1:
+ command: "php test_files/loopo.php"
+ process_num: 1
+ exec_timeout: 5s # s,m,h (seconds, minutes, hours)
+ remain_after_exit: true
+ restart_sec: 1
+
+logs:
+ level: info
+ mode: raw
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/service/configs/.rr-service-init.yaml b/tests/plugins/service/configs/.rr-service-init.yaml
index 1b9cf754..e32f2eda 100644
--- a/tests/plugins/service/configs/.rr-service-init.yaml
+++ b/tests/plugins/service/configs/.rr-service-init.yaml
@@ -2,13 +2,13 @@ service:
some_service_1:
command: "php test_files/loop.php"
process_num: 1
- exec_timeout: 5s
- restart_after_exit: true
- restart_delay: 1s
+ exec_timeout: 5s # s,m,h (seconds, minutes, hours)
+ remain_after_exit: true
+ restart_sec: 1
some_service_2:
command: "test_files/test_binary"
process_num: 1
- restart_after_exit: true
+ remain_after_exit: true
restart_delay: 1s
exec_timeout: 5s
diff --git a/tests/plugins/service/configs/.rr-service-restarts.yaml b/tests/plugins/service/configs/.rr-service-restarts.yaml
new file mode 100644
index 00000000..9095a92f
--- /dev/null
+++ b/tests/plugins/service/configs/.rr-service-restarts.yaml
@@ -0,0 +1,16 @@
+service:
+ some_service_2:
+ command: "test_files/test_binary"
+ process_num: 1
+ remain_after_exit: true
+ restart_delay: 1s
+ exec_timeout: 1s
+
+logs:
+ level: info
+ mode: raw
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go
index add6d374..2c4e53fe 100644
--- a/tests/plugins/service/service_plugin_test.go
+++ b/tests/plugins/service/service_plugin_test.go
@@ -8,15 +8,16 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/service"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
)
func TestServiceInit(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -24,9 +25,182 @@ func TestServiceInit(t *testing.T) {
Prefix: "rr",
}
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("The number is: 0\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 1\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 2\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 3\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 4\n").AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ mockLogger.EXPECT().Info("Hello 0\n The number is: 0\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 1\n The number is: 1\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 2\n The number is: 2\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 3\n The number is: 3\n").MinTimes(1)
+
+ mockLogger.EXPECT().Info("Hello 0").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 1").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 2").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 3").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 4").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &service.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 10)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestServiceError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-service-error.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &service.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ assert.NoError(t, err)
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 10)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestServiceRestarts(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-service-restarts.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ // should not be more than Hello 0, because of restarts
+ mockLogger.EXPECT().Info("Hello 0").MinTimes(1)
+
err = cont.RegisterAll(
cfg,
- &logger.ZapLogger{},
+ mockLogger,
&service.Plugin{},
)
assert.NoError(t, err)
diff --git a/tools/worker_table.go b/tools/worker_table.go
index 157aa972..7887a478 100644
--- a/tools/worker_table.go
+++ b/tools/worker_table.go
@@ -8,31 +8,58 @@ import (
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/olekukonko/tablewriter"
+ "github.com/spiral/roadrunner/v2/pkg/process"
)
// WorkerTable renders table with information about rr server workers.
-func WorkerTable(writer io.Writer, workers []ProcessState) *tablewriter.Table {
+func WorkerTable(writer io.Writer, workers []process.State) *tablewriter.Table {
tw := tablewriter.NewWriter(writer)
- tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"})
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"})
tw.SetColMinWidth(0, 7)
tw.SetColMinWidth(1, 9)
tw.SetColMinWidth(2, 7)
tw.SetColMinWidth(3, 7)
- tw.SetColMinWidth(4, 18)
+ tw.SetColMinWidth(4, 7)
+ tw.SetColMinWidth(5, 18)
- for key := range workers {
+ for i := 0; i < len(workers); i++ {
tw.Append([]string{
- strconv.Itoa(workers[key].Pid),
- renderStatus(workers[key].Status),
- renderJobs(workers[key].NumJobs),
- humanize.Bytes(workers[key].MemoryUsage),
- renderAlive(time.Unix(0, workers[key].Created)),
+ strconv.Itoa(workers[i].Pid),
+ renderStatus(workers[i].Status),
+ renderJobs(workers[i].NumJobs),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ renderAlive(time.Unix(0, workers[i].Created)),
})
}
return tw
}
+// ServiceWorkerTable renders table with information about rr server workers.
+func ServiceWorkerTable(writer io.Writer, workers []process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetHeader([]string{"PID", "Memory", "CPU%"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 7)
+ tw.SetColMinWidth(2, 7)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ })
+ }
+
+ return tw
+}
+
+//go:inline
+func renderCPU(cpu float64) string {
+ return strconv.FormatFloat(cpu, 'f', 2, 64)
+}
+
func renderStatus(status string) string {
switch status {
case "inactive":