summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linux.yml1
-rwxr-xr-xMakefile64
-rw-r--r--go.sum2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go4
-rw-r--r--pkg/process/state.go (renamed from tools/process.go)39
-rw-r--r--pkg/transport/interface.go2
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/state.go10
-rwxr-xr-xpkg/worker/worker.go6
-rw-r--r--plugins/http/plugin.go27
-rw-r--r--plugins/informer/interface.go6
-rw-r--r--plugins/informer/plugin.go6
-rw-r--r--plugins/informer/rpc.go15
-rw-r--r--plugins/logger/config.go2
-rw-r--r--plugins/logger/plugin.go3
-rw-r--r--plugins/service/config.go34
-rw-r--r--plugins/service/plugin.go106
-rw-r--r--plugins/service/process.go153
-rw-r--r--plugins/static/plugin.go2
-rw-r--r--tests/plugins/http/http_plugin_test.go8
-rw-r--r--tests/plugins/informer/informer_test.go4
-rw-r--r--tests/plugins/informer/test_plugin.go16
-rw-r--r--tests/plugins/service/configs/.rr-service-error.yaml16
-rw-r--r--tests/plugins/service/configs/.rr-service-init.yaml22
-rw-r--r--tests/plugins/service/configs/.rr-service-restarts.yaml16
-rw-r--r--tests/plugins/service/service_plugin_test.go255
-rw-r--r--tests/plugins/service/test_files/loop.php6
-rwxr-xr-xtests/plugins/service/test_files/test_binarybin0 -> 1363968 bytes
-rw-r--r--tools/worker_table.go45
30 files changed, 752 insertions, 126 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index a478726d..9d59cd53 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -66,6 +66,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server.txt -covermode=atomic ./tests/plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/service.txt -covermode=atomic ./tests/plugins/service
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/status.txt -covermode=atomic ./tests/plugins/status
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/config.txt -covermode=atomic ./tests/plugins/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/gzip.txt -covermode=atomic ./tests/plugins/gzip
diff --git a/Makefile b/Makefile
index 84735f67..3ff16e7a 100755
--- a/Makefile
+++ b/Makefile
@@ -36,6 +36,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server.out -covermode=atomic ./tests/plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/service.out -covermode=atomic ./tests/plugins/service
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/status.out -covermode=atomic ./tests/plugins/status
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/config.out -covermode=atomic ./tests/plugins/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/gzip.out -covermode=atomic ./tests/plugins/gzip
@@ -68,6 +69,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
go test -v -race -tags=debug ./tests/plugins/server
+ go test -v -race -tags=debug ./tests/plugins/service
go test -v -race -tags=debug ./tests/plugins/status
go test -v -race -tags=debug ./tests/plugins/config
go test -v -race -tags=debug ./tests/plugins/gzip
@@ -85,65 +87,3 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/kv/memory
go test -v -race -tags=debug ./tests/plugins/kv/memcached
docker-compose -f tests/docker-compose.yaml down
-
-test_1.14: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
- go1.14.14 test -v -race -tags=debug ./pkg/transport/pipe
- go1.14.14 test -v -race -tags=debug ./pkg/transport/socket
- go1.14.14 test -v -race -tags=debug ./pkg/pool
- go1.14.14 test -v -race -tags=debug ./pkg/worker
- go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher
- go1.14.14 test -v -race -tags=debug ./tests/plugins/http
- go1.14.14 test -v -race -tags=debug ./plugins/http/config
- go1.14.14 test -v -race -tags=debug ./tests/plugins/informer
- go1.14.14 test -v -race -tags=debug ./tests/plugins/reload
- go1.14.14 test -v -race -tags=debug ./tests/plugins/server
- go1.14.14 test -v -race -tags=debug ./tests/plugins/status
- go1.14.14 test -v -race -tags=debug ./tests/plugins/config
- go1.14.14 test -v -race -tags=debug ./tests/plugins/gzip
- go1.14.14 test -v -race -tags=debug ./tests/plugins/headers
- go1.14.14 test -v -race -tags=debug ./tests/plugins/logger
- go1.14.14 test -v -race -tags=debug ./tests/plugins/metrics
- go1.14.14 test -v -race -tags=debug ./tests/plugins/redis
- go1.14.14 test -v -race -tags=debug ./tests/plugins/resetter
- go1.14.14 test -v -race -tags=debug ./tests/plugins/rpc
- go1.14.14 test -v -race -tags=debug ./tests/plugins/static
- go1.14.14 test -v -race -tags=debug ./plugins/kv/boltdb
- go1.14.14 test -v -race -tags=debug ./plugins/kv/memory
- go1.14.14 test -v -race -tags=debug ./plugins/kv/memcached
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/boltdb
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memory
- go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached
- docker-compose -f tests/docker-compose.yaml down
-
-test_1.16: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
- go1.16rc1 test -v -race -tags=debug ./pkg/transport/pipe
- go1.16rc1 test -v -race -tags=debug ./pkg/transport/socket
- go1.16rc1 test -v -race -tags=debug ./pkg/pool
- go1.16rc1 test -v -race -tags=debug ./pkg/worker
- go1.16rc1 test -v -race -tags=debug ./pkg/worker_watcher
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/http
- go1.16rc1 test -v -race -tags=debug ./plugins/http/config
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/informer
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/reload
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/server
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/status
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/config
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/gzip
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/headers
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/logger
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/metrics
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/redis
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/resetter
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/rpc
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/static
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/boltdb
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/memory
- go1.16rc1 test -v -race -tags=debug ./plugins/kv/memcached
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/boltdb
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memory
- go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memcached
- docker-compose -f tests/docker-compose.yaml down
-
-test_pipeline: test_1.14 test
diff --git a/go.sum b/go.sum
index eb6661b5..abf05933 100644
--- a/go.sum
+++ b/go.sum
@@ -116,6 +116,8 @@ github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis/v8 v8.8.2 h1:O/NcHqobw7SEptA0yA6up6spZVFtwE06SXM8rgLtsP8=
github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
+github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
+github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index c8e45b82..129c6f7d 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -119,7 +119,7 @@ func (sp *StaticPool) addListener(listener events.Listener) {
sp.events.AddListener(listener)
}
-// Config returns associated pool configuration. Immutable.
+// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
}
@@ -134,7 +134,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return nil
}
-// Be careful, sync Exec with ExecWithContext
+// Exec executes provided payload on the worker
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 273adc30..40903db3 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,8 +8,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/tools"
)
const MB = 1024 * 1024
@@ -176,7 +176,7 @@ func (sp *supervised) control() {
continue
}
- s, err := tools.WorkerProcessState(workers[i])
+ s, err := process.WorkerProcessState(workers[i])
if err != nil {
// worker not longer valid for supervision
continue
diff --git a/tools/process.go b/pkg/process/state.go
index a6eb1139..462dd47e 100644
--- a/tools/process.go
+++ b/pkg/process/state.go
@@ -1,4 +1,4 @@
-package tools
+package process
import (
"github.com/shirou/gopsutil/process"
@@ -6,8 +6,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
-// ProcessState provides information about specific worker.
-type ProcessState struct {
+// State provides information about specific worker.
+type State struct {
// Pid contains process id.
Pid int `json:"pid"`
@@ -23,18 +23,26 @@ type ProcessState struct {
// MemoryUsage holds the information about worker memory usage in bytes.
// Values might vary for different operating systems and based on RSS.
MemoryUsage uint64 `json:"memoryUsage"`
+
+ CPUPercent float64
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) {
+func WorkerProcessState(w worker.BaseProcess) (State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return ProcessState{}, errors.E(op, err)
+ return State{}, errors.E(op, err)
+ }
+
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
}
- return ProcessState{
+ return State{
+ CPUPercent: percent,
Pid: int(w.Pid()),
Status: w.State().String(),
NumJobs: w.State().NumExecs(),
@@ -42,3 +50,22 @@ func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) {
MemoryUsage: i.RSS,
}, nil
}
+
+func GeneralProcessState(pid int) (State, error) {
+ const op = errors.Op("process_state")
+ p, _ := process.NewProcess(int32(pid))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return State{}, errors.E(op, err)
+ }
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
+ }
+
+ return State{
+ CPUPercent: percent,
+ Pid: pid,
+ MemoryUsage: i.RSS,
+ }, nil
+}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
index 299ac95f..7e3e5350 100644
--- a/pkg/transport/interface.go
+++ b/pkg/transport/interface.go
@@ -10,7 +10,7 @@ import (
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
- // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 2b68717a..d2cfe2cd 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -16,7 +16,7 @@ type State interface {
Value() int64
// Set sets the StateImpl
Set(value int64)
- // NumJobs shows how many times WorkerProcess was invoked
+ // NumExecs shows how many times WorkerProcess was invoked
NumExecs() uint64
// IsActive returns true if WorkerProcess not Inactive or Stopped
IsActive() bool
@@ -69,6 +69,6 @@ type SyncWorker interface {
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
- // ExecWithContext used to handle Exec with TTL
+ // ExecWithTTL used to handle Exec with TTL
ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 502f8199..9c4543c8 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -24,10 +24,10 @@ const (
// StateKilling - process is being forcibly stopped
StateKilling
- // State of worker, when no need to allocate new one
+ // StateDestroyed State of worker, when no need to allocate new one
StateDestroyed
- // State of worker, when it reached executions limit
+ // StateMaxJobsReached State of worker, when it reached executions limit
StateMaxJobsReached
// StateStopped - process has been terminated.
@@ -91,17 +91,17 @@ func (s *StateImpl) IsActive() bool {
return val == StateWorking || val == StateReady
}
-// change StateImpl value (status)
+// Set change StateImpl value (status)
func (s *StateImpl) Set(value int64) {
atomic.StoreInt64(&s.value, value)
}
-// register new execution atomically
+// RegisterExec register new execution atomically
func (s *StateImpl) RegisterExec() {
atomic.AddUint64(&s.numExecs, 1)
}
-// Update last used time
+// SetLastUsed Update last used time
func (s *StateImpl) SetLastUsed(lu uint64) {
atomic.StoreUint64(&s.lastUsed, lu)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b04e1363..69c438b0 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -96,14 +96,12 @@ func (w *Process) State() State {
return w.state
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// AttachRelay attaches relay to the worker
func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// Relay returns relay attached to the worker
func (w *Process) Relay() relay.Relay {
return w.relay
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 86fcb329..8c8a86b4 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -17,6 +17,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -332,8 +333,24 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RUnlock()
}
-// Workers returns associated pool workers
-func (s *Plugin) Workers() []worker.BaseProcess {
+// Workers returns slice with the process states for the workers
+func (s *Plugin) Workers() []process.State {
+ workers := s.pool.Workers()
+
+ ps := make([]process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+// internal
+func (s *Plugin) workers() []worker.BaseProcess {
return s.pool.Workers()
}
@@ -395,7 +412,7 @@ func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
// Status return status of the particular plugin
func (s *Plugin) Status() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -409,9 +426,9 @@ func (s *Plugin) Status() status.Status {
}
}
-// Status return status of the particular plugin
+// Ready return readiness status of the particular plugin
func (s *Plugin) Ready() status.Status {
- workers := s.Workers()
+ workers := s.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
// we assume, that plugin's worker pool is ready
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 8e3b922b..45f44691 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,8 +1,10 @@
package informer
-import "github.com/spiral/roadrunner/v2/pkg/worker"
+import (
+ "github.com/spiral/roadrunner/v2/pkg/process"
+)
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []worker.BaseProcess
+ Workers() []process.State
}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 416c0112..98081d34 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,7 +3,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,7 @@ func (p *Plugin) Init(log logger.Logger) error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
+func (p *Plugin) Workers(name string) ([]process.State, error) {
const op = errors.Op("informer_plugin_workers")
svc, ok := p.registry[name]
if !ok {
@@ -49,7 +49,7 @@ func (p *Plugin) Name() string {
return PluginName
}
-// RPCService returns associated rpc service.
+// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
return &rpc{srv: p, log: p.log}
}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 55a9832b..b0b5b1af 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,9 +1,8 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/tools"
)
type rpc struct {
@@ -14,7 +13,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}
// List all resettable services.
@@ -38,15 +37,9 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return err
}
- list.Workers = make([]tools.ProcessState, 0)
- for _, w := range workers {
- ps, err := tools.WorkerProcessState(w.(worker.BaseProcess))
- if err != nil {
- continue
- }
+ // write actual processes
+ list.Workers = workers
- list.Workers = append(list.Workers, ps)
- }
rpc.log.Debug("list of workers", "workers", list.Workers)
rpc.log.Debug("successfully finished Workers method")
return nil
diff --git a/plugins/logger/config.go b/plugins/logger/config.go
index eee5fb71..c435e8be 100644
--- a/plugins/logger/config.go
+++ b/plugins/logger/config.go
@@ -22,7 +22,7 @@ type Config struct {
// level of all loggers descended from this config.
Level string `mapstructure:"level"`
- // Encoding sets the logger's encoding. Valid values are "json" and
+ // Encoding sets the logger's encoding. InitDefault values are "json" and
// "console", as well as any third-party encodings registered via
// RegisterEncoder.
Encoding string `mapstructure:"encoding"`
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go
index 08fc2454..e1066cba 100644
--- a/plugins/logger/plugin.go
+++ b/plugins/logger/plugin.go
@@ -69,7 +69,7 @@ func (z *ZapLogger) NamedLogger(name string) (Logger, error) {
return NewZapAdapter(z.base.Named(name)), nil
}
-// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
+// ServiceLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
func (z *ZapLogger) ServiceLogger(n endure.Named) (Logger, error) {
return z.NamedLogger(n.Name())
}
@@ -78,5 +78,6 @@ func (z *ZapLogger) ServiceLogger(n endure.Named) (Logger, error) {
func (z *ZapLogger) Provides() []interface{} {
return []interface{}{
z.ServiceLogger,
+ z.DefaultLogger,
}
}
diff --git a/plugins/service/config.go b/plugins/service/config.go
new file mode 100644
index 00000000..871c8f76
--- /dev/null
+++ b/plugins/service/config.go
@@ -0,0 +1,34 @@
+package service
+
+import "time"
+
+// Service represents particular service configuration
+type Service struct {
+ Command string `mapstructure:"command"`
+ ProcessNum int `mapstructure:"process_num"`
+ ExecTimeout time.Duration `mapstructure:"exec_timeout"`
+ RemainAfterExit bool `mapstructure:"remain_after_exit"`
+ RestartSec uint64 `mapstructure:"restart_sec"`
+}
+
+// Config for the services
+type Config struct {
+ Services map[string]Service `mapstructure:"service"`
+}
+
+func (c *Config) InitDefault() {
+ if len(c.Services) > 0 {
+ for k, v := range c.Services {
+ if v.ProcessNum == 0 {
+ val := c.Services[k]
+ val.ProcessNum = 1
+ c.Services[k] = val
+ }
+ if v.RestartSec == 0 {
+ val := c.Services[k]
+ val.RestartSec = 30
+ c.Services[k] = val
+ }
+ }
+ }
+}
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
new file mode 100644
index 00000000..91e47e86
--- /dev/null
+++ b/plugins/service/plugin.go
@@ -0,0 +1,106 @@
+package service
+
+import (
+ "sync"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName string = "service"
+
+type Plugin struct {
+ sync.Mutex
+
+ logger logger.Logger
+ cfg Config
+
+ // all processes attached to the service
+ processes []*Process
+}
+
+func (service *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("service_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(errors.Disabled)
+ }
+ err := cfg.UnmarshalKey(PluginName, &service.cfg.Services)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // init default parameters if not set by user
+ service.cfg.InitDefault()
+ // save the logger
+ service.logger = log
+
+ return nil
+}
+
+func (service *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ // start processing
+ go func() {
+ // lock here, because Stop command might be invoked during the Serve
+ service.Lock()
+ defer service.Unlock()
+
+ service.processes = make([]*Process, 0, len(service.cfg.Services))
+ // for the every service
+ for k := range service.cfg.Services {
+ // create needed number of the processes
+ for i := 0; i < service.cfg.Services[k].ProcessNum; i++ {
+ // create processor structure, which will process all the services
+ service.processes = append(service.processes, NewServiceProcess(
+ service.cfg.Services[k].RemainAfterExit,
+ service.cfg.Services[k].ExecTimeout,
+ service.cfg.Services[k].RestartSec,
+ service.cfg.Services[k].Command,
+ service.logger,
+ errCh,
+ ))
+ }
+ }
+
+ // start all processes
+ for i := 0; i < len(service.processes); i++ {
+ service.processes[i].start()
+ }
+ }()
+
+ return errCh
+}
+
+func (service *Plugin) Workers() []process.State {
+ service.Lock()
+ defer service.Unlock()
+ states := make([]process.State, 0, len(service.processes))
+ for i := 0; i < len(service.processes); i++ {
+ st, err := process.GeneralProcessState(service.processes[i].Pid)
+ if err != nil {
+ continue
+ }
+ states = append(states, st)
+ }
+ return states
+}
+
+func (service *Plugin) Stop() error {
+ service.Lock()
+ defer service.Unlock()
+
+ if len(service.processes) > 0 {
+ for i := 0; i < len(service.processes); i++ {
+ service.processes[i].stop()
+ }
+ }
+ return nil
+}
+
+// Name contains service name.
+func (service *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/service/process.go b/plugins/service/process.go
new file mode 100644
index 00000000..49219eb0
--- /dev/null
+++ b/plugins/service/process.go
@@ -0,0 +1,153 @@
+package service
+
+import (
+ "os/exec"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+ "unsafe"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Process structure contains an information about process, restart information, log, errors, etc
+type Process struct {
+ sync.Mutex
+ // command to execute
+ command *exec.Cmd
+ // rawCmd from the plugin
+ rawCmd string
+ Pid int
+
+ // root plugin error chan
+ errCh chan error
+ // logger
+ log logger.Logger
+
+ ExecTimeout time.Duration
+ RemainAfterExit bool
+ RestartSec uint64
+
+ // process start time
+ startTime time.Time
+ stopped uint64
+}
+
+// NewServiceProcess constructs service process structure
+func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restartDelay uint64, command string, l logger.Logger, errCh chan error) *Process {
+ return &Process{
+ rawCmd: command,
+ RestartSec: restartDelay,
+ ExecTimeout: execTimeout,
+ RemainAfterExit: restartAfterExit,
+ errCh: errCh,
+ log: l,
+ }
+}
+
+// write message to the log (stderr)
+func (p *Process) Write(b []byte) (int, error) {
+ p.log.Info(toString(b))
+ return len(b), nil
+}
+
+func (p *Process) start() {
+ p.Lock()
+ defer p.Unlock()
+ const op = errors.Op("processor_start")
+
+ // crate fat-process here
+ p.createProcess()
+
+ // non blocking process start
+ err := p.command.Start()
+ if err != nil {
+ p.errCh <- errors.E(op, err)
+ return
+ }
+
+ // start process waiting routine
+ go p.wait()
+ // execHandler checks for the execTimeout
+ go p.execHandler()
+ // save start time
+ p.startTime = time.Now()
+ p.Pid = p.command.Process.Pid
+}
+
+// create command for the process
+func (p *Process) createProcess() {
+ // cmdArgs contain command arguments if the command in form of: php <command> or ls <command> -i -b
+ var cmdArgs []string
+ cmdArgs = append(cmdArgs, strings.Split(p.rawCmd, " ")...)
+ if len(cmdArgs) < 2 {
+ p.command = exec.Command(p.rawCmd) //nolint:gosec
+ } else {
+ p.command = exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
+ }
+ // redirect stderr into the Write function of the process.go
+ p.command.Stderr = p
+}
+
+// wait process for exit
+func (p *Process) wait() {
+ // Wait error doesn't matter here
+ err := p.command.Wait()
+ if err != nil {
+ p.log.Error("process wait error", "error", err)
+ }
+ // wait for restart delay
+ if p.RemainAfterExit {
+ // wait for the delay
+ time.Sleep(time.Second * time.Duration(p.RestartSec))
+ // and start command again
+ p.start()
+ }
+}
+
+// stop can be only sent by the Endure when plugin stopped
+func (p *Process) stop() {
+ atomic.StoreUint64(&p.stopped, 1)
+}
+
+func (p *Process) execHandler() {
+ tt := time.NewTicker(time.Second)
+ for range tt.C {
+ // lock here, because p.startTime could be changed during the check
+ p.Lock()
+ // if the exec timeout is set
+ if p.ExecTimeout != 0 {
+ // if stopped -> kill the process (SIGINT-> SIGKILL) and exit
+ if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) {
+ err := p.command.Process.Signal(syscall.SIGINT)
+ if err != nil {
+ _ = p.command.Process.Signal(syscall.SIGKILL)
+ }
+ tt.Stop()
+ p.Unlock()
+ return
+ }
+
+ // check the running time for the script
+ if time.Now().After(p.startTime.Add(p.ExecTimeout)) {
+ err := p.command.Process.Signal(syscall.SIGINT)
+ if err != nil {
+ _ = p.command.Process.Signal(syscall.SIGKILL)
+ }
+ p.Unlock()
+ tt.Stop()
+ return
+ }
+ }
+ p.Unlock()
+ }
+}
+
+// unsafe and fast []byte to string convert
+//go:inline
+func toString(data []byte) string {
+ return *(*string)(unsafe.Pointer(&data))
+}
diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go
index 5f108701..76cb9e68 100644
--- a/plugins/static/plugin.go
+++ b/plugins/static/plugin.go
@@ -57,7 +57,7 @@ func (s *Plugin) Name() string {
return PluginName
}
-// middleware must return true if request/response pair is handled within the middleware.
+// Middleware must return true if request/response pair is handled within the middleware.
func (s *Plugin) Middleware(next http.Handler) http.HandlerFunc {
// Define the http.HandlerFunc
return func(w http.ResponseWriter, r *http.Request) {
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index 51c5fda2..73d6d102 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -21,13 +21,13 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/tests/mocks"
- "github.com/spiral/roadrunner/v2/tools"
"github.com/yookoala/gofast"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -288,7 +288,7 @@ func informerTest(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "http", &list)
@@ -1336,7 +1336,7 @@ func informerTestBefore(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "http", &list)
@@ -1353,7 +1353,7 @@ func informerTestAfter(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
assert.NotZero(t, workerPid)
diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go
index 31e14ff4..b6f50fd5 100644
--- a/tests/plugins/informer/informer_test.go
+++ b/tests/plugins/informer/informer_test.go
@@ -12,12 +12,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
@@ -100,7 +100,7 @@ func informerWorkersRPCTest(t *testing.T) {
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
+ Workers []process.State `json:"workers"`
}{}
err = client.Call("informer.Workers", "informer.plugin1", &list)
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 8a1fb933..0c9065a3 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -49,11 +49,21 @@ func (p1 *Plugin1) Name() string {
return "informer.plugin1"
}
-func (p1 *Plugin1) Workers() []worker.BaseProcess {
+func (p1 *Plugin1) Workers() []process.State {
p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
}
- return p.Workers()
+ ps := make([]process.State, 0, len(p.Workers()))
+ workers := p.Workers()
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
}
diff --git a/tests/plugins/service/configs/.rr-service-error.yaml b/tests/plugins/service/configs/.rr-service-error.yaml
new file mode 100644
index 00000000..3b0f1eb9
--- /dev/null
+++ b/tests/plugins/service/configs/.rr-service-error.yaml
@@ -0,0 +1,16 @@
+service:
+ some_service_1:
+ command: "php test_files/loopo.php"
+ process_num: 1
+ exec_timeout: 5s # s,m,h (seconds, minutes, hours)
+ remain_after_exit: true
+ restart_sec: 1
+
+logs:
+ level: info
+ mode: raw
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/service/configs/.rr-service-init.yaml b/tests/plugins/service/configs/.rr-service-init.yaml
new file mode 100644
index 00000000..e32f2eda
--- /dev/null
+++ b/tests/plugins/service/configs/.rr-service-init.yaml
@@ -0,0 +1,22 @@
+service:
+ some_service_1:
+ command: "php test_files/loop.php"
+ process_num: 1
+ exec_timeout: 5s # s,m,h (seconds, minutes, hours)
+ remain_after_exit: true
+ restart_sec: 1
+ some_service_2:
+ command: "test_files/test_binary"
+ process_num: 1
+ remain_after_exit: true
+ restart_delay: 1s
+ exec_timeout: 5s
+
+logs:
+ level: info
+ mode: raw
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/service/configs/.rr-service-restarts.yaml b/tests/plugins/service/configs/.rr-service-restarts.yaml
new file mode 100644
index 00000000..9095a92f
--- /dev/null
+++ b/tests/plugins/service/configs/.rr-service-restarts.yaml
@@ -0,0 +1,16 @@
+service:
+ some_service_2:
+ command: "test_files/test_binary"
+ process_num: 1
+ remain_after_exit: true
+ restart_delay: 1s
+ exec_timeout: 1s
+
+logs:
+ level: info
+ mode: raw
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go
new file mode 100644
index 00000000..2c4e53fe
--- /dev/null
+++ b/tests/plugins/service/service_plugin_test.go
@@ -0,0 +1,255 @@
+package service
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/golang/mock/gomock"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/service"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestServiceInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-service-init.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("The number is: 0\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 1\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 2\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 3\n").MinTimes(1)
+ mockLogger.EXPECT().Info("The number is: 4\n").AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ mockLogger.EXPECT().Info("Hello 0\n The number is: 0\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 1\n The number is: 1\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 2\n The number is: 2\n").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 3\n The number is: 3\n").MinTimes(1)
+
+ mockLogger.EXPECT().Info("Hello 0").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 1").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 2").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 3").MinTimes(1)
+ mockLogger.EXPECT().Info("Hello 4").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &service.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 10)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestServiceError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-service-error.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &service.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ assert.NoError(t, err)
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 10)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestServiceRestarts(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-service-restarts.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+
+ // process interrupt error
+ mockLogger.EXPECT().Error("process wait error", gomock.Any()).MinTimes(2)
+
+ // should not be more than Hello 0, because of restarts
+ mockLogger.EXPECT().Info("Hello 0").MinTimes(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &service.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 10)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
diff --git a/tests/plugins/service/test_files/loop.php b/tests/plugins/service/test_files/loop.php
new file mode 100644
index 00000000..6ba488ef
--- /dev/null
+++ b/tests/plugins/service/test_files/loop.php
@@ -0,0 +1,6 @@
+<?php
+for ($x = 0; $x <= 1000; $x++) {
+ sleep(1);
+ error_log("The number is: $x", 0);
+}
+?>
diff --git a/tests/plugins/service/test_files/test_binary b/tests/plugins/service/test_files/test_binary
new file mode 100755
index 00000000..480fb7e2
--- /dev/null
+++ b/tests/plugins/service/test_files/test_binary
Binary files differ
diff --git a/tools/worker_table.go b/tools/worker_table.go
index 157aa972..7887a478 100644
--- a/tools/worker_table.go
+++ b/tools/worker_table.go
@@ -8,31 +8,58 @@ import (
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/olekukonko/tablewriter"
+ "github.com/spiral/roadrunner/v2/pkg/process"
)
// WorkerTable renders table with information about rr server workers.
-func WorkerTable(writer io.Writer, workers []ProcessState) *tablewriter.Table {
+func WorkerTable(writer io.Writer, workers []process.State) *tablewriter.Table {
tw := tablewriter.NewWriter(writer)
- tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"})
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"})
tw.SetColMinWidth(0, 7)
tw.SetColMinWidth(1, 9)
tw.SetColMinWidth(2, 7)
tw.SetColMinWidth(3, 7)
- tw.SetColMinWidth(4, 18)
+ tw.SetColMinWidth(4, 7)
+ tw.SetColMinWidth(5, 18)
- for key := range workers {
+ for i := 0; i < len(workers); i++ {
tw.Append([]string{
- strconv.Itoa(workers[key].Pid),
- renderStatus(workers[key].Status),
- renderJobs(workers[key].NumJobs),
- humanize.Bytes(workers[key].MemoryUsage),
- renderAlive(time.Unix(0, workers[key].Created)),
+ strconv.Itoa(workers[i].Pid),
+ renderStatus(workers[i].Status),
+ renderJobs(workers[i].NumJobs),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ renderAlive(time.Unix(0, workers[i].Created)),
})
}
return tw
}
+// ServiceWorkerTable renders table with information about rr server workers.
+func ServiceWorkerTable(writer io.Writer, workers []process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetHeader([]string{"PID", "Memory", "CPU%"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 7)
+ tw.SetColMinWidth(2, 7)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ })
+ }
+
+ return tw
+}
+
+//go:inline
+func renderCPU(cpu float64) string {
+ return strconv.FormatFloat(cpu, 'f', 2, 64)
+}
+
func renderStatus(status string) string {
switch status {
case "inactive":