summaryrefslogtreecommitdiff
path: root/plugins/http
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/http')
-rw-r--r--plugins/http/handler.go4
-rw-r--r--plugins/http/plugin.go165
-rw-r--r--plugins/http/response.go2
-rw-r--r--plugins/http/tests/configs/.rr-echoErr.yaml28
-rw-r--r--plugins/http/tests/configs/.rr-http.yaml2
-rw-r--r--plugins/http/tests/http_test.go192
-rw-r--r--plugins/http/tests/plugin_middleware.go61
-rw-r--r--plugins/http/tests/plugin_test_old.go122
8 files changed, 381 insertions, 195 deletions
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index f770a401..74b038ff 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -23,7 +23,7 @@ const (
EventError
)
-type Handler interface {
+type Handle interface {
AddListener(l util.EventListener)
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
@@ -76,7 +76,7 @@ type handler struct {
lsn util.EventListener
}
-func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handler, error) {
+func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handle, error) {
if pool == nil {
return nil, errors.E(errors.Str("pool should be initialized"))
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index e58f9359..78179242 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -33,36 +33,37 @@ const (
EventInitSSL = 750
)
-// http middleware type.
-type middleware func(f http.HandlerFunc) http.HandlerFunc
+// Middleware interface
+type Middleware interface {
+ Middleware(f http.Handler) http.HandlerFunc
+}
// Service manages pool, http servers.
type Plugin struct {
sync.Mutex
- sync.WaitGroup
- cfg *Config
configurer config.Configurer
+ server factory.Server
log log.Logger
- mdwr []middleware
+ cfg *Config
+ // middlewares to chain
+ mdwr []Middleware
+ // Event listener to stdout
listener util.EventListener
- pool roadrunner.Pool
- server factory.Server
+ // Pool which attached to all servers
+ pool roadrunner.Pool
- handler Handler
+ // servers RR handler
+ handler Handle
+ // servers
http *http.Server
https *http.Server
fcgi *http.Server
}
-// AddMiddleware adds new net/http mdwr.
-func (s *Plugin) AddMiddleware(m middleware) {
- s.mdwr = append(s.mdwr, m)
-}
-
// AddListener attaches server event controller.
func (s *Plugin) AddListener(listener util.EventListener) {
// save listeners for Reset
@@ -120,6 +121,8 @@ func (s *Plugin) logCallback(event interface{}) {
s.log.Info("response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
case ErrorEvent:
s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error)
+ case roadrunner.WorkerEvent:
+ s.log.Info("worker event received", "event", ev.Event, "worker state", ev.Worker.State())
default:
fmt.Println(event)
}
@@ -213,6 +216,10 @@ func (s *Plugin) Serve() chan error {
}()
}
+ if len(s.mdwr) > 0 {
+ s.addMiddlewares()
+ }
+
return errCh
}
@@ -270,17 +277,73 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = attributes.Init(r)
// protect the case, when user send Reset and we are replacing handler with pool
s.Lock()
- f := s.handler.ServeHTTP
+ s.handler.ServeHTTP(w, r)
s.Unlock()
+}
- // chaining middleware
- if len(s.mdwr) > 0 {
- for i := 0; i < len(s.mdwr); i++ {
- f = s.mdwr[i](f)
- }
+// Server returns associated pool workers
+func (s *Plugin) Workers() []roadrunner.WorkerBase {
+ return s.pool.Workers()
+}
+
+func (s *Plugin) Name() string {
+ return ServiceName
+}
+
+func (s *Plugin) Reset() error {
+ s.Lock()
+ defer s.Unlock()
+ const op = errors.Op("http reset")
+ s.log.Info("Resetting http plugin")
+ s.pool.Destroy(context.Background())
+
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+ var err error
+
+ // re-read the config
+ err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
+ Supervisor: s.cfg.Pool.Supervisor,
+ }, env)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.handler, err = NewHandler(
+ s.cfg.MaxRequestSize,
+ *s.cfg.Uploads,
+ s.cfg.cidrs,
+ s.pool,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // restore original listeners
+ s.pool.AddListener(s.listener)
+
+ return nil
+}
+
+func (s *Plugin) Collects() []interface{} {
+ return []interface{}{
+ s.AddMiddleware,
}
+}
- f(w, r)
+func (s *Plugin) AddMiddleware(m Middleware) {
+ s.mdwr = append(s.mdwr, m)
}
func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) bool {
@@ -442,57 +505,21 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string {
return host
}
-// Server returns associated pool workers
-func (s *Plugin) Workers() []roadrunner.WorkerBase {
- return s.pool.Workers()
-}
-
-func (s *Plugin) Name() string {
- return ServiceName
-}
-
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
- const op = errors.Op("http reset")
- s.log.Info("Resetting http plugin")
- s.pool.Destroy(context.Background())
-
- // Set needed env vars
- env := make(map[string]string)
- env["RR_HTTP"] = "true"
- var err error
-
- // re-read the config
- err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
- if err != nil {
- return errors.E(op, err)
+func (s *Plugin) addMiddlewares() {
+ if s.http != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.http.Handler = s.mdwr[i].Middleware(s.http.Handler)
+ }
}
-
- s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, env)
- if err != nil {
- return errors.E(op, err)
+ if s.https != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.https.Handler = s.mdwr[i].Middleware(s.https.Handler)
+ }
}
- s.handler, err = NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.cidrs,
- s.pool,
- )
- if err != nil {
- return errors.E(op, err)
+ if s.fcgi != nil {
+ for i := 0; i < len(s.mdwr); i++ {
+ s.fcgi.Handler = s.mdwr[i].Middleware(s.fcgi.Handler)
+ }
}
-
- // restore original listeners
- s.pool.AddListener(s.listener)
-
- return nil
}
diff --git a/plugins/http/response.go b/plugins/http/response.go
index b0023b59..e3ac2756 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -4,6 +4,7 @@ import (
"io"
"net/http"
"strings"
+ "sync"
"github.com/spiral/roadrunner/v2"
)
@@ -18,6 +19,7 @@ type Response struct {
// associated Body payload.
Body interface{}
+ sync.Mutex
}
// NewResponse creates new response based on given pool payload.
diff --git a/plugins/http/tests/configs/.rr-echoErr.yaml b/plugins/http/tests/configs/.rr-echoErr.yaml
new file mode 100644
index 00000000..5a97723d
--- /dev/null
+++ b/plugins/http/tests/configs/.rr-echoErr.yaml
@@ -0,0 +1,28 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+server:
+ command: "php ../../../tests/http/client.php echoerr pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+http:
+ debug: true
+ address: 127.0.0.1:8080
+ maxRequestSize: 1024
+ middleware: [ "" ]
+ uploads:
+ forbid: [ "" ]
+ trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ numWorkers: 1
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+
diff --git a/plugins/http/tests/configs/.rr-http.yaml b/plugins/http/tests/configs/.rr-http.yaml
index c989df24..efb3d4fb 100644
--- a/plugins/http/tests/configs/.rr-http.yaml
+++ b/plugins/http/tests/configs/.rr-http.yaml
@@ -3,7 +3,7 @@ rpc:
disabled: false
server:
- command: "php psr-worker.php"
+ command: "php ../../../tests/http/client.php echo pipes"
user: ""
group: ""
env:
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 9ea474a3..bd329758 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -15,9 +15,11 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
"github.com/spiral/endure"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/informer"
@@ -178,8 +180,8 @@ func echoHTTP(t *testing.T) {
assert.NoError(t, err)
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
- assert.Equal(t, "hello world", string(b))
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
err = r.Body.Close()
assert.NoError(t, err)
@@ -583,6 +585,7 @@ func TestFastCGI_RequestUri(t *testing.T) {
}
func fcgiReqURI(t *testing.T) {
+ time.Sleep(time.Second * 2)
fcgiConnFactory := gofast.SimpleConnFactory("tcp", "127.0.0.1:6921")
fcgiHandler := gofast.NewHandler(
@@ -773,6 +776,191 @@ func h2c(t *testing.T) {
}
}
+func TestHttpMiddleware(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, ""))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-http.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ &PluginMiddleware{},
+ &PluginMiddleware2{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ go func() {
+ tt := time.NewTimer(time.Second * 10)
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ t.Run("MiddlewareTest", middleware)
+ wg.Wait()
+}
+
+func middleware(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://localhost:8084?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err = r.Body.Close()
+ assert.NoError(t, err)
+
+ req, err = http.NewRequest("GET", "http://localhost:8084/halt", nil)
+ assert.NoError(t, err)
+
+ r, err = http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ b, err = ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.Equal(t, 500, r.StatusCode)
+ assert.Equal(t, "halted", string(b))
+
+ err = r.Body.Close()
+ assert.NoError(t, err)
+}
+
+func TestHttpEchoErr(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, ""))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-echoErr.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Info("response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1")
+ mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any())
+ mockLogger.EXPECT().Info("worker event received", "event", roadrunner.EventWorkerLog, "worker state", gomock.Any())
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ &PluginMiddleware{},
+ &PluginMiddleware2{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ go func() {
+ tt := time.NewTimer(time.Second * 5)
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ t.Run("HttpEchoError", echoError)
+ wg.Wait()
+}
+
+func echoError(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://localhost:8080?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+ err = r.Body.Close()
+ assert.NoError(t, err)
+}
+
func get(url string) (string, *http.Response, error) {
r, err := http.Get(url)
if err != nil {
diff --git a/plugins/http/tests/plugin_middleware.go b/plugins/http/tests/plugin_middleware.go
new file mode 100644
index 00000000..de829d34
--- /dev/null
+++ b/plugins/http/tests/plugin_middleware.go
@@ -0,0 +1,61 @@
+package tests
+
+import (
+ "net/http"
+
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type PluginMiddleware struct {
+ config config.Configurer
+}
+
+func (p *PluginMiddleware) Init(cfg config.Configurer) error {
+ p.config = cfg
+ return nil
+}
+
+func (p *PluginMiddleware) Middleware(next http.Handler) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path == "/halt" {
+ w.WriteHeader(500)
+ _, err := w.Write([]byte("halted"))
+ if err != nil {
+ panic("error writing the data to the http reply")
+ }
+ } else {
+ next.ServeHTTP(w, r)
+ }
+ }
+}
+
+func (p *PluginMiddleware) Name() string {
+ return "pluginMiddleware"
+}
+
+type PluginMiddleware2 struct {
+ config config.Configurer
+}
+
+func (p *PluginMiddleware2) Init(cfg config.Configurer) error {
+ p.config = cfg
+ return nil
+}
+
+func (p *PluginMiddleware2) Middleware(next http.Handler) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path == "/boom" {
+ w.WriteHeader(555)
+ _, err := w.Write([]byte("boom"))
+ if err != nil {
+ panic("error writing the data to the http reply")
+ }
+ } else {
+ next.ServeHTTP(w, r)
+ }
+ }
+}
+
+func (p *PluginMiddleware2) Name() string {
+ return "pluginMiddleware2"
+}
diff --git a/plugins/http/tests/plugin_test_old.go b/plugins/http/tests/plugin_test_old.go
index 1ef7002e..dae18bb4 100644
--- a/plugins/http/tests/plugin_test_old.go
+++ b/plugins/http/tests/plugin_test_old.go
@@ -348,127 +348,7 @@ package tests
// }
//}
//
-//func Test_Service_Middleware(t *testing.T) {
-// bkoff := backoff.NewExponentialBackOff()
-// bkoff.MaxElapsedTime = time.Second * 15
-//
-// err := backoff.Retry(func() error {
-// logger, _ := test.NewNullLogger()
-// logger.SetLevel(logrus.DebugLevel)
-//
-// c := service.NewContainer(logger)
-// c.Register(ID, &Service{})
-//
-// err := c.Init(&testCfg{httpCfg: `{
-// "enable": true,
-// "address": ":6032",
-// "maxRequestSize": 1024,
-// "uploads": {
-// "dir": ` + tmpDir() + `,
-// "forbid": []
-// },
-// "workers":{
-// "command": "php ../../tests/http/client.php echo pipes",
-// "relay": "pipes",
-// "pool": {
-// "numWorkers": 1,
-// "allocateTimeout": 10000000,
-// "destroyTimeout": 10000000
-// }
-// }
-// }`})
-// if err != nil {
-// return err
-// }
-//
-// s, st := c.Get(ID)
-// assert.NotNil(t, s)
-// assert.Equal(t, service.StatusOK, st)
-//
-// s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
-// return func(w http.ResponseWriter, r *http.Request) {
-// if r.URL.Path == "/halt" {
-// w.WriteHeader(500)
-// _, err := w.Write([]byte("halted"))
-// if err != nil {
-// t.Errorf("error writing the data to the http reply: error %v", err)
-// }
-// } else {
-// f(w, r)
-// }
-// }
-// })
-//
-// go func() {
-// err := c.Serve()
-// if err != nil {
-// t.Errorf("serve error: %v", err)
-// }
-// }()
-// time.Sleep(time.Millisecond * 500)
-//
-// req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// r, err := http.DefaultClient.Do(req)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// b, err := ioutil.ReadAll(r.Body)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// assert.Equal(t, 201, r.StatusCode)
-// assert.Equal(t, "WORLD", string(b))
-//
-// err = r.Body.Close()
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// r, err = http.DefaultClient.Do(req)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-// b, err = ioutil.ReadAll(r.Body)
-// if err != nil {
-// c.Stop()
-// return err
-// }
-//
-// assert.Equal(t, 500, r.StatusCode)
-// assert.Equal(t, "halted", string(b))
-//
-// err = r.Body.Close()
-// if err != nil {
-// c.Stop()
-// return err
-// }
-// c.Stop()
-//
-// return nil
-// }, bkoff)
-//
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-//}
+
//
//func Test_Service_Listener(t *testing.T) {
// bkoff := backoff.NewExponentialBackOff()