diff options
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-x | pool/static_pool.go | 61 |
1 files changed, 33 insertions, 28 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 7481f84f..019c34b2 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,7 +2,6 @@ package pool import ( "context" - "fmt" "os/exec" "time" @@ -13,13 +12,12 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" + "go.uber.org/zap" ) const ( // StopRequest can be sent by worker to indicate that restart is required. StopRequest = `{"stop":true}` - // pluginName ... - pluginName = "pool" ) // ErrorEncoder encode error or make a decision based on the error type @@ -32,6 +30,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { cfg *Config + log *zap.Logger // worker command creator cmd Command @@ -39,9 +38,6 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - events events.EventBus - eventsID string - // manages worker states and TTLs ww Watcher @@ -52,8 +48,8 @@ type StaticPool struct { errEncoder ErrorEncoder } -// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { +// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -64,13 +60,10 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } - eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: eb, - eventsID: id, + cfg: cfg, + cmd: cmd, + factory: factory, } // add pool options @@ -78,10 +71,19 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg options[i](p) } + if p.log == nil { + z, err := zap.NewProduction() + if err != nil { + return nil, err + } + + p.log = z + } + // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -99,7 +101,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, eb, p.cfg.Supervisor) + sp := supervisorWrapper(p, p.log, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -108,6 +110,12 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } +func WithLogger(z *zap.Logger) Options { + return func(p *StaticPool) { + p.log = z + } +} + // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -158,7 +166,6 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { - sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -183,13 +190,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) + sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - + sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { // mark old as invalid and stop @@ -209,15 +215,14 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - + sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) // kill the worker instead of sending net packet to it _ = w.Kill() return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -268,7 +273,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid()))) + sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) } } @@ -289,7 +294,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err))) + sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err)) return nil, errors.E(op, err) } // else if err not nil - return error @@ -310,7 +315,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid()))) + sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) return sw, nil } } @@ -336,7 +341,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { // destroy the worker err = sw.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) return nil, err } @@ -363,7 +368,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) err = sw.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) return nil, err } |