diff options
-rw-r--r-- | service/http/service.go | 30 | ||||
-rw-r--r-- | static_pool.go | 8 |
2 files changed, 22 insertions, 16 deletions
diff --git a/service/http/service.go b/service/http/service.go index ecce1c15..94524897 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -24,20 +24,20 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc // Service manages rr, http servers. type Service struct { - cfg *Config - env env.Environment - lsns []func(event int, ctx interface{}) - mdws []middleware - mu sync.Mutex - rr *roadrunner.Server - stopping int32 - srv *Handler - http *http.Server + cfg *Config + env env.Environment + lsns []func(event int, ctx interface{}) + middleware []middleware + mu sync.Mutex + rr *roadrunner.Server + stopping int32 + srv *Handler + http *http.Server } // AddMiddleware adds new net/http middleware. func (s *Service) AddMiddleware(m middleware) { - s.mdws = append(s.mdws, m) + s.middleware = append(s.middleware, m) } // AddListener attaches server event watcher. @@ -87,7 +87,7 @@ func (s *Service) Serve() error { s.rr.Listen(s.listener) s.srv.Listen(s.listener) - if len(s.mdws) == 0 { + if len(s.middleware) == 0 { s.http.Handler = s.srv } else { s.http.Handler = s @@ -97,7 +97,7 @@ func (s *Service) Serve() error { if err := rr.Start(); err != nil { return err } - defer s.rr.Stop() + defer rr.Stop() return s.http.ListenAndServe() } @@ -118,13 +118,13 @@ func (s *Service) Stop() { s.http.Shutdown(context.Background()) } -// middleware handles connection using set of mdws and rr PSR-7 server. +// middleware handles connection using set of middleware and rr PSR-7 server. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = attributes.Init(r) - // chaining middlewares + // chaining middleware f := s.srv.ServeHTTP - for _, m := range s.mdws { + for _, m := range s.middleware { f = m(f) } f(w, r) diff --git a/static_pool.go b/static_pool.go index 95d2fe14..b6e43ddc 100644 --- a/static_pool.go +++ b/static_pool.go @@ -42,6 +42,7 @@ type StaticPool struct { // pool is being destroyed inDestroy int32 + destroy chan interface{} // lsn is optional callback to handle worker create/destruct/error events. mul sync.Mutex @@ -60,6 +61,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), free: make(chan *Worker, cfg.NumWorkers), + destroy: make(chan interface{}), } // constant number of workers simplify logic @@ -144,7 +146,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { // Destroy all underlying workers (but let them to complete the task). func (p *StaticPool) Destroy() { atomic.AddInt32(&p.inDestroy, 1) - + close(p.destroy) p.tasks.Wait() var wg sync.WaitGroup @@ -173,6 +175,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } return w, nil + case <-p.destroy: + return nil, fmt.Errorf("pool has been stopped") default: // enable timeout handler } @@ -189,6 +193,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } return w, nil + case <-p.destroy: + return nil, fmt.Errorf("pool has been stopped") } } |