summaryrefslogtreecommitdiff
path: root/plugins/http/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r--plugins/http/plugin.go165
1 files changed, 96 insertions, 69 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index e58f9359..78179242 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -33,36 +33,37 @@ const (
EventInitSSL = 750
)
-// http middleware type.
-type middleware func(f http.HandlerFunc) http.HandlerFunc
+// Middleware interface
+type Middleware interface {
+ Middleware(f http.Handler) http.HandlerFunc
+}
// Service manages pool, http servers.
type Plugin struct {
sync.Mutex
- sync.WaitGroup
- cfg *Config
configurer config.Configurer
+ server factory.Server
log log.Logger
- mdwr []middleware
+ cfg *Config
+ // middlewares to chain
+ mdwr []Middleware
+ // Event listener to stdout
listener util.EventListener
- pool roadrunner.Pool
- server factory.Server
+ // Pool which attached to all servers
+ pool roadrunner.Pool
- handler Handler
+ // servers RR handler
+ handler Handle
+ // servers
http *http.Server
https *http.Server
fcgi *http.Server
}
-// AddMiddleware adds new net/http mdwr.
-func (s *Plugin) AddMiddleware(m middleware) {
- s.mdwr = append(s.mdwr, m)
-}
-
// AddListener attaches server event controller.
func (s *Plugin) AddListener(listener util.EventListener) {
// save listeners for Reset
@@ -120,6 +121,8 @@ func (s *Plugin) logCallback(event interface{}) {
s.log.Info("response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
case ErrorEvent:
s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error)
+ case roadrunner.WorkerEvent:
+ s.log.Info("worker event received", "event", ev.Event, "worker state", ev.Worker.State())
default:
fmt.Println(event)
}
@@ -213,6 +216,10 @@ func (s *Plugin) Serve() chan error {
}()
}
+ if len(s.mdwr) > 0 {
+ s.addMiddlewares()
+ }
+
return errCh
}
@@ -270,17 +277,73 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = attributes.Init(r)
// protect the case, when user send Reset and we are replacing handler with pool
s.Lock()
- f := s.handler.ServeHTTP
+ s.handler.ServeHTTP(w, r)
s.Unlock()
+}
- // chaining middleware
- if len(s.mdwr) > 0 {
- for i := 0; i < len(s.mdwr); i++ {
- f = s.mdwr[i](f)
- }
+// Server returns associated pool workers
+func (s *Plugin) Workers() []roadrunner.WorkerBase {
+ return s.pool.Workers()
+}
+
+func (s *Plugin) Name() string {
+ return ServiceName
+}
+
+func (s *Plugin) Reset() error {
+ s.Lock()
+ defer s.Unlock()
+ const op = errors.Op("http reset")
+ s.log.Info("Resetting http plugin")
+ s.pool.Destroy(context.Background())
+
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+ var err error
+
+ // re-read the config
+ err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
+ Supervisor: s.cfg.Pool.Supervisor,
+ }, env)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.handler, err = NewHandler(
+ s.cfg.MaxRequestSize,
+ *s.cfg.Uploads,
+ s.cfg.cidrs,
+ s.pool,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // restore original listeners
+ s.pool.AddListener(s.listener)
+
+ return nil
+}
+
+func (s *Plugin) Collects() []interface{} {
+ return []interface{}{
+ s.AddMiddleware,
}
+}
- f(w, r)
+func (s *Plugin) AddMiddleware(m Middleware) {
+ s.mdwr = append(s.mdwr, m)
}
func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) bool {
@@ -442,57 +505,21 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string {
return host
}
-// Server returns associated pool workers
-func (s *Plugin) Workers() []roadrunner.WorkerBase {
- return s.pool.Workers()
-}
-
-func (s *Plugin) Name() string {
- return ServiceName
-}
-
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
- const op = errors.Op("http reset")
- s.log.Info("Resetting http plugin")
- s.pool.Destroy(context.Background())
-
- // Set needed env vars
- env := make(map[string]string)
- env["RR_HTTP"] = "true"
- var err error
-
- // re-read the config
- err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
- if err != nil {
- return errors.E(op, err)
+func (s *Plugin) addMiddlewares() {
+ if s.http != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.http.Handler = s.mdwr[i].Middleware(s.http.Handler)
+ }
}
-
- s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, env)
- if err != nil {
- return errors.E(op, err)
+ if s.https != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.https.Handler = s.mdwr[i].Middleware(s.https.Handler)
+ }
}
- s.handler, err = NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.cidrs,
- s.pool,
- )
- if err != nil {
- return errors.E(op, err)
+ if s.fcgi != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.fcgi.Handler = s.mdwr[i].Middleware(s.fcgi.Handler)
+ }
}
-
- // restore original listeners
- s.pool.AddListener(s.listener)
-
- return nil
}