summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--service/http/service.go30
-rw-r--r--static_pool.go8
2 files changed, 22 insertions, 16 deletions
diff --git a/service/http/service.go b/service/http/service.go
index ecce1c15..94524897 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -24,20 +24,20 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc
// Service manages rr, http servers.
type Service struct {
- cfg *Config
- env env.Environment
- lsns []func(event int, ctx interface{})
- mdws []middleware
- mu sync.Mutex
- rr *roadrunner.Server
- stopping int32
- srv *Handler
- http *http.Server
+ cfg *Config
+ env env.Environment
+ lsns []func(event int, ctx interface{})
+ middleware []middleware
+ mu sync.Mutex
+ rr *roadrunner.Server
+ stopping int32
+ srv *Handler
+ http *http.Server
}
// AddMiddleware adds new net/http middleware.
func (s *Service) AddMiddleware(m middleware) {
- s.mdws = append(s.mdws, m)
+ s.middleware = append(s.middleware, m)
}
// AddListener attaches server event watcher.
@@ -87,7 +87,7 @@ func (s *Service) Serve() error {
s.rr.Listen(s.listener)
s.srv.Listen(s.listener)
- if len(s.mdws) == 0 {
+ if len(s.middleware) == 0 {
s.http.Handler = s.srv
} else {
s.http.Handler = s
@@ -97,7 +97,7 @@ func (s *Service) Serve() error {
if err := rr.Start(); err != nil {
return err
}
- defer s.rr.Stop()
+ defer rr.Stop()
return s.http.ListenAndServe()
}
@@ -118,13 +118,13 @@ func (s *Service) Stop() {
s.http.Shutdown(context.Background())
}
-// middleware handles connection using set of mdws and rr PSR-7 server.
+// middleware handles connection using set of middleware and rr PSR-7 server.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = attributes.Init(r)
- // chaining middlewares
+ // chaining middleware
f := s.srv.ServeHTTP
- for _, m := range s.mdws {
+ for _, m := range s.middleware {
f = m(f)
}
f(w, r)
diff --git a/static_pool.go b/static_pool.go
index 95d2fe14..b6e43ddc 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,6 +42,7 @@ type StaticPool struct {
// pool is being destroyed
inDestroy int32
+ destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
mul sync.Mutex
@@ -60,6 +61,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
free: make(chan *Worker, cfg.NumWorkers),
+ destroy: make(chan interface{}),
}
// constant number of workers simplify logic
@@ -144,7 +146,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
// Destroy all underlying workers (but let them to complete the task).
func (p *StaticPool) Destroy() {
atomic.AddInt32(&p.inDestroy, 1)
-
+ close(p.destroy)
p.tasks.Wait()
var wg sync.WaitGroup
@@ -173,6 +175,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
default:
// enable timeout handler
}
@@ -189,6 +193,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
}
}