diff options
-rw-r--r-- | server.go | 6 | ||||
-rw-r--r-- | server_test.go | 3 | ||||
-rw-r--r-- | service/container.go | 1 | ||||
-rw-r--r-- | service/http/rpc.go | 7 | ||||
-rw-r--r-- | service/http/service.go | 8 | ||||
-rw-r--r-- | static_pool.go | 35 | ||||
-rw-r--r-- | static_pool_test.go | 2 |
7 files changed, 33 insertions, 29 deletions
@@ -75,12 +75,12 @@ func (srv *Server) Start() (err error) { } // Stop underlying worker pool and close the factory. -func (srv *Server) Stop() error { +func (srv *Server) Stop() { srv.mu.Lock() defer srv.mu.Unlock() if !srv.started { - return nil + return } srv.throw(EventPoolDestruct, srv.pool) @@ -91,8 +91,6 @@ func (srv *Server) Stop() error { srv.pool = nil srv.started = false srv.throw(EventServerStop, srv) - - return nil } // Exec one task with given payload and context, returns result or error. diff --git a/server_test.go b/server_test.go index 2e92aad8..0ae06713 100644 --- a/server_test.go +++ b/server_test.go @@ -107,7 +107,8 @@ func TestServer_Stop_NotStarted(t *testing.T) { DestroyTimeout: time.Second, }, }) - assert.NoError(t, srv.Stop()) + + srv.Stop() assert.Nil(t, srv.Workers()) } diff --git a/service/container.go b/service/container.go index 1233b30d..56350be7 100644 --- a/service/container.go +++ b/service/container.go @@ -140,6 +140,7 @@ func (c *container) Serve() error { defer e.setStatus(StatusStopped) if err := e.svc.Serve(); err != nil { + c.log.Errorf("[%s]: %s", e.name, err) done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) } }(e) diff --git a/service/http/rpc.go b/service/http/rpc.go index fc6aa65b..aebc5903 100644 --- a/service/http/rpc.go +++ b/service/http/rpc.go @@ -34,12 +34,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error { } *r = "OK" - - for _, w := range rpc.svc.rr.Workers() { - w.Kill() - } - - return nil //rpc.svc.rr.Reset() + return rpc.svc.rr.Reset() } // Workers returns list of active workers and their stats. diff --git a/service/http/service.go b/service/http/service.go index a3773468..dc89c188 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -19,9 +19,9 @@ type middleware interface { // Service manages rr, http servers. type Service struct { cfg *Config - listeners []func(event int, ctx interface{}) - middleware []middleware + lsns []func(event int, ctx interface{}) rr *roadrunner.Server + middleware []middleware srv *Handler http *http.Server } @@ -32,7 +32,7 @@ func (s *Service) AddMiddleware(m middleware) { // AddListener attaches server event watcher. func (s *Service) AddListener(l func(event int, ctx interface{})) { - s.listeners = append(s.listeners, l) + s.lsns = append(s.lsns, l) } // Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of @@ -113,7 +113,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (s *Service) listener(event int, ctx interface{}) { - for _, l := range s.listeners { + for _, l := range s.lsns { l(event, ctx) } diff --git a/static_pool.go b/static_pool.go index a972b04a..18bfaf60 100644 --- a/static_pool.go +++ b/static_pool.go @@ -150,20 +150,29 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { - // this loop is required to skip issues with dead workers still being in a ring. - select { - case w = <-p.free: - return w, nil - default: - // enable timeout handler - } + for i := int64(0); i <= p.cfg.NumWorkers; i++ { + // this loop is required to skip issues with dead workers still being in a ring. + select { + case w = <-p.free: + if w.State().Value() != StateReady { + continue + } + + return w, nil + default: + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w = <-p.free: - timeout.Stop() + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w = <-p.free: + timeout.Stop() + if w.State().Value() != StateReady { + continue + } + } } return w, nil diff --git a/static_pool_test.go b/static_pool_test.go index ec468389..548fb126 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -186,7 +186,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { destructed := make(chan interface{}) p.Listen(func(e int, ctx interface{}) { if e == EventWorkerConstruct { - close(destructed) + destructed <- nil } }) |