summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go4
-rw-r--r--pkg/process/state.go76
-rw-r--r--pkg/transport/interface.go2
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/state.go10
-rwxr-xr-xpkg/worker/worker.go6
7 files changed, 90 insertions, 16 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index c8e45b82..129c6f7d 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -119,7 +119,7 @@ func (sp *StaticPool) addListener(listener events.Listener) {
sp.events.AddListener(listener)
}
-// Config returns associated pool configuration. Immutable.
+// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
}
@@ -134,7 +134,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return nil
}
-// Be careful, sync Exec with ExecWithContext
+// Exec executes provided payload on the worker
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 273adc30..40903db3 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,8 +8,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/tools"
)
const MB = 1024 * 1024
@@ -176,7 +176,7 @@ func (sp *supervised) control() {
continue
}
- s, err := tools.WorkerProcessState(workers[i])
+ s, err := process.WorkerProcessState(workers[i])
if err != nil {
// worker not longer valid for supervision
continue
diff --git a/pkg/process/state.go b/pkg/process/state.go
new file mode 100644
index 00000000..652ec77c
--- /dev/null
+++ b/pkg/process/state.go
@@ -0,0 +1,76 @@
+package process
+
+import (
+ "github.com/shirou/gopsutil/process"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// State provides information about specific worker.
+type State struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumJobs uint64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // MemoryUsage holds the information about worker memory usage in bytes.
+ // Values might vary for different operating systems and based on RSS.
+ MemoryUsage uint64 `json:"memoryUsage"`
+
+ // CPU_Percent returns how many percent of the CPU time this process uses
+ CPUPercent float64
+
+ // Command used in the service plugin and shows a command for the particular service
+ Command string
+}
+
+// WorkerProcessState creates new worker state definition.
+func WorkerProcessState(w worker.BaseProcess) (State, error) {
+ const op = errors.Op("worker_process_state")
+ p, _ := process.NewProcess(int32(w.Pid()))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return State{}, errors.E(op, err)
+ }
+
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
+ }
+
+ return State{
+ CPUPercent: percent,
+ Pid: int(w.Pid()),
+ Status: w.State().String(),
+ NumJobs: w.State().NumExecs(),
+ Created: w.Created().UnixNano(),
+ MemoryUsage: i.RSS,
+ }, nil
+}
+
+func GeneralProcessState(pid int, command string) (State, error) {
+ const op = errors.Op("process_state")
+ p, _ := process.NewProcess(int32(pid))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return State{}, errors.E(op, err)
+ }
+ percent, err := p.CPUPercent()
+ if err != nil {
+ return State{}, err
+ }
+
+ return State{
+ CPUPercent: percent,
+ Pid: pid,
+ MemoryUsage: i.RSS,
+ Command: command,
+ }, nil
+}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
index 299ac95f..7e3e5350 100644
--- a/pkg/transport/interface.go
+++ b/pkg/transport/interface.go
@@ -10,7 +10,7 @@ import (
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
- // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 2b68717a..d2cfe2cd 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -16,7 +16,7 @@ type State interface {
Value() int64
// Set sets the StateImpl
Set(value int64)
- // NumJobs shows how many times WorkerProcess was invoked
+ // NumExecs shows how many times WorkerProcess was invoked
NumExecs() uint64
// IsActive returns true if WorkerProcess not Inactive or Stopped
IsActive() bool
@@ -69,6 +69,6 @@ type SyncWorker interface {
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
- // ExecWithContext used to handle Exec with TTL
+ // ExecWithTTL used to handle Exec with TTL
ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 502f8199..9c4543c8 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -24,10 +24,10 @@ const (
// StateKilling - process is being forcibly stopped
StateKilling
- // State of worker, when no need to allocate new one
+ // StateDestroyed State of worker, when no need to allocate new one
StateDestroyed
- // State of worker, when it reached executions limit
+ // StateMaxJobsReached State of worker, when it reached executions limit
StateMaxJobsReached
// StateStopped - process has been terminated.
@@ -91,17 +91,17 @@ func (s *StateImpl) IsActive() bool {
return val == StateWorking || val == StateReady
}
-// change StateImpl value (status)
+// Set change StateImpl value (status)
func (s *StateImpl) Set(value int64) {
atomic.StoreInt64(&s.value, value)
}
-// register new execution atomically
+// RegisterExec register new execution atomically
func (s *StateImpl) RegisterExec() {
atomic.AddUint64(&s.numExecs, 1)
}
-// Update last used time
+// SetLastUsed Update last used time
func (s *StateImpl) SetLastUsed(lu uint64) {
atomic.StoreUint64(&s.lastUsed, lu)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b04e1363..69c438b0 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -96,14 +96,12 @@ func (w *Process) State() State {
return w.state
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// AttachRelay attaches relay to the worker
func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
+// Relay returns relay attached to the worker
func (w *Process) Relay() relay.Relay {
return w.relay
}