diff options
author | Valery Piashchynski <[email protected]> | 2021-04-19 16:42:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-19 16:42:28 +0300 |
commit | baa12b092578d41218585d918fb7e1425700272d (patch) | |
tree | 91881bd0ac32c609ea01fafe3bbc15a13a67c392 /pkg | |
parent | 112b7b60bbc045f4935e1766be9d2266abf68b31 (diff) |
- Add tests, update Informer implementation
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 4 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 4 | ||||
-rw-r--r-- | pkg/process/state.go | 71 | ||||
-rw-r--r-- | pkg/transport/interface.go | 2 | ||||
-rw-r--r-- | pkg/worker/interface.go | 4 | ||||
-rwxr-xr-x | pkg/worker/state.go | 10 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 6 |
7 files changed, 85 insertions, 16 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index c8e45b82..129c6f7d 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -119,7 +119,7 @@ func (sp *StaticPool) addListener(listener events.Listener) { sp.events.AddListener(listener) } -// Config returns associated pool configuration. Immutable. +// GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg } @@ -134,7 +134,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return nil } -// Be careful, sync Exec with ExecWithContext +// Exec executes provided payload on the worker func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 273adc30..40903db3 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -8,8 +8,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/tools" ) const MB = 1024 * 1024 @@ -176,7 +176,7 @@ func (sp *supervised) control() { continue } - s, err := tools.WorkerProcessState(workers[i]) + s, err := process.WorkerProcessState(workers[i]) if err != nil { // worker not longer valid for supervision continue diff --git a/pkg/process/state.go b/pkg/process/state.go new file mode 100644 index 00000000..462dd47e --- /dev/null +++ b/pkg/process/state.go @@ -0,0 +1,71 @@ +package process + +import ( + "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// State provides information about specific worker. +type State struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumJobs uint64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` + + // MemoryUsage holds the information about worker memory usage in bytes. + // Values might vary for different operating systems and based on RSS. + MemoryUsage uint64 `json:"memoryUsage"` + + CPUPercent float64 +} + +// WorkerProcessState creates new worker state definition. +func WorkerProcessState(w worker.BaseProcess) (State, error) { + const op = errors.Op("worker_process_state") + p, _ := process.NewProcess(int32(w.Pid())) + i, err := p.MemoryInfo() + if err != nil { + return State{}, errors.E(op, err) + } + + percent, err := p.CPUPercent() + if err != nil { + return State{}, err + } + + return State{ + CPUPercent: percent, + Pid: int(w.Pid()), + Status: w.State().String(), + NumJobs: w.State().NumExecs(), + Created: w.Created().UnixNano(), + MemoryUsage: i.RSS, + }, nil +} + +func GeneralProcessState(pid int) (State, error) { + const op = errors.Op("process_state") + p, _ := process.NewProcess(int32(pid)) + i, err := p.MemoryInfo() + if err != nil { + return State{}, errors.E(op, err) + } + percent, err := p.CPUPercent() + if err != nil { + return State{}, err + } + + return State{ + CPUPercent: percent, + Pid: pid, + MemoryUsage: i.RSS, + }, nil +} diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go index 299ac95f..7e3e5350 100644 --- a/pkg/transport/interface.go +++ b/pkg/transport/interface.go @@ -10,7 +10,7 @@ import ( // Factory is responsible of wrapping given command into tasks WorkerProcess. type Factory interface { - // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 2b68717a..d2cfe2cd 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -16,7 +16,7 @@ type State interface { Value() int64 // Set sets the StateImpl Set(value int64) - // NumJobs shows how many times WorkerProcess was invoked + // NumExecs shows how many times WorkerProcess was invoked NumExecs() uint64 // IsActive returns true if WorkerProcess not Inactive or Stopped IsActive() bool @@ -69,6 +69,6 @@ type SyncWorker interface { BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs payload.Payload) (payload.Payload, error) - // ExecWithContext used to handle Exec with TTL + // ExecWithTTL used to handle Exec with TTL ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/pkg/worker/state.go b/pkg/worker/state.go index 502f8199..9c4543c8 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -24,10 +24,10 @@ const ( // StateKilling - process is being forcibly stopped StateKilling - // State of worker, when no need to allocate new one + // StateDestroyed State of worker, when no need to allocate new one StateDestroyed - // State of worker, when it reached executions limit + // StateMaxJobsReached State of worker, when it reached executions limit StateMaxJobsReached // StateStopped - process has been terminated. @@ -91,17 +91,17 @@ func (s *StateImpl) IsActive() bool { return val == StateWorking || val == StateReady } -// change StateImpl value (status) +// Set change StateImpl value (status) func (s *StateImpl) Set(value int64) { atomic.StoreInt64(&s.value, value) } -// register new execution atomically +// RegisterExec register new execution atomically func (s *StateImpl) RegisterExec() { atomic.AddUint64(&s.numExecs, 1) } -// Update last used time +// SetLastUsed Update last used time func (s *StateImpl) SetLastUsed(lu uint64) { atomic.StoreUint64(&s.lastUsed, lu) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index b04e1363..69c438b0 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -96,14 +96,12 @@ func (w *Process) State() State { return w.state } -// State return receive-only Process state object, state can be used to safely access -// Process status, time when status changed and number of Process executions. +// AttachRelay attaches relay to the worker func (w *Process) AttachRelay(rl relay.Relay) { w.relay = rl } -// State return receive-only Process state object, state can be used to safely access -// Process status, time when status changed and number of Process executions. +// Relay returns relay attached to the worker func (w *Process) Relay() relay.Relay { return w.relay } |