summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server.go6
-rw-r--r--server_test.go3
-rw-r--r--service/container.go1
-rw-r--r--service/http/rpc.go7
-rw-r--r--service/http/service.go8
-rw-r--r--static_pool.go35
-rw-r--r--static_pool_test.go2
7 files changed, 33 insertions, 29 deletions
diff --git a/server.go b/server.go
index 75cf3691..efdeaca2 100644
--- a/server.go
+++ b/server.go
@@ -75,12 +75,12 @@ func (srv *Server) Start() (err error) {
}
// Stop underlying worker pool and close the factory.
-func (srv *Server) Stop() error {
+func (srv *Server) Stop() {
srv.mu.Lock()
defer srv.mu.Unlock()
if !srv.started {
- return nil
+ return
}
srv.throw(EventPoolDestruct, srv.pool)
@@ -91,8 +91,6 @@ func (srv *Server) Stop() error {
srv.pool = nil
srv.started = false
srv.throw(EventServerStop, srv)
-
- return nil
}
// Exec one task with given payload and context, returns result or error.
diff --git a/server_test.go b/server_test.go
index 2e92aad8..0ae06713 100644
--- a/server_test.go
+++ b/server_test.go
@@ -107,7 +107,8 @@ func TestServer_Stop_NotStarted(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- assert.NoError(t, srv.Stop())
+
+ srv.Stop()
assert.Nil(t, srv.Workers())
}
diff --git a/service/container.go b/service/container.go
index 1233b30d..56350be7 100644
--- a/service/container.go
+++ b/service/container.go
@@ -140,6 +140,7 @@ func (c *container) Serve() error {
defer e.setStatus(StatusStopped)
if err := e.svc.Serve(); err != nil {
+ c.log.Errorf("[%s]: %s", e.name, err)
done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
}
}(e)
diff --git a/service/http/rpc.go b/service/http/rpc.go
index fc6aa65b..aebc5903 100644
--- a/service/http/rpc.go
+++ b/service/http/rpc.go
@@ -34,12 +34,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error {
}
*r = "OK"
-
- for _, w := range rpc.svc.rr.Workers() {
- w.Kill()
- }
-
- return nil //rpc.svc.rr.Reset()
+ return rpc.svc.rr.Reset()
}
// Workers returns list of active workers and their stats.
diff --git a/service/http/service.go b/service/http/service.go
index a3773468..dc89c188 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -19,9 +19,9 @@ type middleware interface {
// Service manages rr, http servers.
type Service struct {
cfg *Config
- listeners []func(event int, ctx interface{})
- middleware []middleware
+ lsns []func(event int, ctx interface{})
rr *roadrunner.Server
+ middleware []middleware
srv *Handler
http *http.Server
}
@@ -32,7 +32,7 @@ func (s *Service) AddMiddleware(m middleware) {
// AddListener attaches server event watcher.
func (s *Service) AddListener(l func(event int, ctx interface{})) {
- s.listeners = append(s.listeners, l)
+ s.lsns = append(s.lsns, l)
}
// Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of
@@ -113,7 +113,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (s *Service) listener(event int, ctx interface{}) {
- for _, l := range s.listeners {
+ for _, l := range s.lsns {
l(event, ctx)
}
diff --git a/static_pool.go b/static_pool.go
index a972b04a..18bfaf60 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -150,20 +150,29 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- // this loop is required to skip issues with dead workers still being in a ring.
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
+ for i := int64(0); i <= p.cfg.NumWorkers; i++ {
+ // this loop is required to skip issues with dead workers still being in a ring.
+ select {
+ case w = <-p.free:
+ if w.State().Value() != StateReady {
+ continue
+ }
+
+ return w, nil
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w = <-p.free:
- timeout.Stop()
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w = <-p.free:
+ timeout.Stop()
+ if w.State().Value() != StateReady {
+ continue
+ }
+ }
}
return w, nil
diff --git a/static_pool_test.go b/static_pool_test.go
index ec468389..548fb126 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -186,7 +186,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
destructed := make(chan interface{})
p.Listen(func(e int, ctx interface{}) {
if e == EventWorkerConstruct {
- close(destructed)
+ destructed <- nil
}
})