diff options
-rw-r--r-- | cmd/rr/.rr.yaml | 4 | ||||
-rw-r--r-- | debug/service.go | 22 | ||||
-rw-r--r-- | static_pool.go | 37 |
3 files changed, 35 insertions, 28 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index 9ee75856..33bfd14a 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -39,10 +39,10 @@ http: maxJobs: 0 # for how long worker is allowed to be bootstrapped. - allocateTimeout: 6000000 + allocateTimeout: 60000000 # amount of time given to worker to gracefully destruct itself. - destroyTimeout: 6000000 + destroyTimeout: 60000000 # static file serving. static: diff --git a/debug/service.go b/debug/service.go index 5838b75d..3664e91a 100644 --- a/debug/service.go +++ b/debug/service.go @@ -51,14 +51,14 @@ func (s *Service) listener(event int, ctx interface{}) { switch event { case http.EventResponse: log := ctx.(*http.Log) - s.Logger.Print(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) case http.EventError: log := ctx.(*http.Log) if _, ok := log.Error.(roadrunner.JobError); ok { - s.Logger.Print(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) } else { - s.Logger.Print(utils.Sprintf( + s.Logger.Info(utils.Sprintf( "%s <white+hb>%s</reset> %s <red>%s</reset>", statusColor(log.Status), log.Method, @@ -71,15 +71,15 @@ func (s *Service) listener(event int, ctx interface{}) { switch event { case roadrunner.EventWorkerKill: w := ctx.(*roadrunner.Worker) - s.Logger.Print(utils.Sprintf( - "<white+hb>worker.%v</reset> <red>killed</red>", + s.Logger.Warning(utils.Sprintf( + "<white+hb>worker.%v</reset> <yellow>killed</red>", *w.Pid, )) case roadrunner.EventWorkerError: err := ctx.(roadrunner.WorkerError) - s.Logger.Print(utils.Sprintf( - "<white+hb>worker.%v</reset> <red></reset>", + s.Logger.Error(utils.Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", *err.Worker.Pid, err.Caused, )) @@ -88,17 +88,15 @@ func (s *Service) listener(event int, ctx interface{}) { // rr server events switch event { case roadrunner.EventServerFailure: - s.Logger.Print(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx)) + s.Logger.Error(utils.Sprintf("<red+hb>http.rr</reset>: <red+hb>server is dead</reset>")) } // pool events switch event { case roadrunner.EventPoolConstruct: - s.Logger.Print(utils.Sprintf("<white+hb>http.rr</reset>: <green>worker pool constructed</reset>")) - case roadrunner.EventPoolDestruct: - s.Logger.Print(utils.Sprintf("<white+hb>http.rr</reset>: <yellow>worker pool destructed</reset>")) + s.Logger.Debug(utils.Sprintf("<white+hb>http.rr</reset>: <green>new worker pool</reset>")) case roadrunner.EventPoolError: - s.Logger.Print(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx)) + s.Logger.Error(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx)) } } diff --git a/static_pool.go b/static_pool.go index 0ae345e5..9f4aab23 100644 --- a/static_pool.go +++ b/static_pool.go @@ -142,21 +142,30 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { - select { - case w = <-p.free: - return w, nil - default: - // enable timeout handler - } + // this loop is required to skip issues with dead workers still being in a ring. + for i := uint64(0); i < p.cfg.NumWorkers; i++ { + select { + case w = <-p.free: + if w.state.Value() == StateReady { + return w, nil + } + default: + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + if w.state.Value() == StateReady { + return w, nil + } + } } + + return w, nil } // release releases or replaces the worker. @@ -209,7 +218,7 @@ func (p *StaticPool) createWorker() (*Worker, error) { // attempting to replace worker if err := p.replaceWorker(w, err); err != nil { - p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err)) + p.throw(EventPoolError, fmt.Errorf("unable to replace: %s", err)) } } }(w) |