summaryrefslogtreecommitdiff
path: root/pool/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-xpool/static_pool.go61
1 files changed, 33 insertions, 28 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 7481f84f..019c34b2 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,7 +2,6 @@ package pool
import (
"context"
- "fmt"
"os/exec"
"time"
@@ -13,13 +12,12 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
+ "go.uber.org/zap"
)
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = `{"stop":true}`
- // pluginName ...
- pluginName = "pool"
)
// ErrorEncoder encode error or make a decision based on the error type
@@ -32,6 +30,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
cfg *Config
+ log *zap.Logger
// worker command creator
cmd Command
@@ -39,9 +38,6 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- events events.EventBus
- eventsID string
-
// manages worker states and TTLs
ww Watcher
@@ -52,8 +48,8 @@ type StaticPool struct {
errEncoder ErrorEncoder
}
-// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
if factory == nil {
return nil, errors.Str("no factory initialized")
}
@@ -64,13 +60,10 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
- eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: eb,
- eventsID: id,
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
}
// add pool options
@@ -78,10 +71,19 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ if p.log == nil {
+ z, err := zap.NewProduction()
+ if err != nil {
+ return nil, err
+ }
+
+ p.log = z
+ }
+
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -99,7 +101,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, p.log, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -108,6 +110,12 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
+func WithLogger(z *zap.Logger) Options {
+ return func(p *StaticPool) {
+ p.log = z
+ }
+}
+
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -158,7 +166,6 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -183,13 +190,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
@@ -209,15 +215,14 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// kill the worker instead of sending net packet to it
_ = w.Kill()
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -268,7 +273,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid())))
+ sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
}
}
@@ -289,7 +294,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err)))
+ sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -310,7 +315,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid())))
+ sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}
@@ -336,7 +341,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// destroy the worker
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}
@@ -363,7 +368,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}