summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/.rr.yaml4
-rw-r--r--debug/service.go22
-rw-r--r--static_pool.go37
3 files changed, 35 insertions, 28 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index 9ee75856..33bfd14a 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -39,10 +39,10 @@ http:
maxJobs: 0
# for how long worker is allowed to be bootstrapped.
- allocateTimeout: 6000000
+ allocateTimeout: 60000000
# amount of time given to worker to gracefully destruct itself.
- destroyTimeout: 6000000
+ destroyTimeout: 60000000
# static file serving.
static:
diff --git a/debug/service.go b/debug/service.go
index 5838b75d..3664e91a 100644
--- a/debug/service.go
+++ b/debug/service.go
@@ -51,14 +51,14 @@ func (s *Service) listener(event int, ctx interface{}) {
switch event {
case http.EventResponse:
log := ctx.(*http.Log)
- s.Logger.Print(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
case http.EventError:
log := ctx.(*http.Log)
if _, ok := log.Error.(roadrunner.JobError); ok {
- s.Logger.Print(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
} else {
- s.Logger.Print(utils.Sprintf(
+ s.Logger.Info(utils.Sprintf(
"%s <white+hb>%s</reset> %s <red>%s</reset>",
statusColor(log.Status),
log.Method,
@@ -71,15 +71,15 @@ func (s *Service) listener(event int, ctx interface{}) {
switch event {
case roadrunner.EventWorkerKill:
w := ctx.(*roadrunner.Worker)
- s.Logger.Print(utils.Sprintf(
- "<white+hb>worker.%v</reset> <red>killed</red>",
+ s.Logger.Warning(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>killed</red>",
*w.Pid,
))
case roadrunner.EventWorkerError:
err := ctx.(roadrunner.WorkerError)
- s.Logger.Print(utils.Sprintf(
- "<white+hb>worker.%v</reset> <red></reset>",
+ s.Logger.Error(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
*err.Worker.Pid,
err.Caused,
))
@@ -88,17 +88,15 @@ func (s *Service) listener(event int, ctx interface{}) {
// rr server events
switch event {
case roadrunner.EventServerFailure:
- s.Logger.Print(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx))
+ s.Logger.Error(utils.Sprintf("<red+hb>http.rr</reset>: <red+hb>server is dead</reset>"))
}
// pool events
switch event {
case roadrunner.EventPoolConstruct:
- s.Logger.Print(utils.Sprintf("<white+hb>http.rr</reset>: <green>worker pool constructed</reset>"))
- case roadrunner.EventPoolDestruct:
- s.Logger.Print(utils.Sprintf("<white+hb>http.rr</reset>: <yellow>worker pool destructed</reset>"))
+ s.Logger.Debug(utils.Sprintf("<white+hb>http.rr</reset>: <green>new worker pool</reset>"))
case roadrunner.EventPoolError:
- s.Logger.Print(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx))
+ s.Logger.Error(utils.Sprintf("<red+hb>http.rr</reset>: <red>%s</reset>", ctx))
}
}
diff --git a/static_pool.go b/static_pool.go
index 0ae345e5..9f4aab23 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -142,21 +142,30 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
+ // this loop is required to skip issues with dead workers still being in a ring.
+ for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ select {
+ case w = <-p.free:
+ if w.state.Value() == StateReady {
+ return w, nil
+ }
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w := <-p.free:
+ timeout.Stop()
+ if w.state.Value() == StateReady {
+ return w, nil
+ }
+ }
}
+
+ return w, nil
}
// release releases or replaces the worker.
@@ -209,7 +218,7 @@ func (p *StaticPool) createWorker() (*Worker, error) {
// attempting to replace worker
if err := p.replaceWorker(w, err); err != nil {
- p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err))
+ p.throw(EventPoolError, fmt.Errorf("unable to replace: %s", err))
}
}
}(w)