summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--_____/http/server.go87
-rw-r--r--payload.go2
-rw-r--r--pool.go4
-rw-r--r--server.go22
-rw-r--r--server_test.go4
-rw-r--r--service/http/config.go2
-rw-r--r--service/http/handler.go71
-rw-r--r--service/http/parse.go2
-rw-r--r--service/http/request.go10
-rw-r--r--service/http/server.go84
-rw-r--r--service/http/service.go31
-rw-r--r--service/http/uploads.go4
-rw-r--r--service/http/uploads_config.go (renamed from service/http/fs_config.go)6
-rw-r--r--service/http/uploads_config_test.go (renamed from service/http/fs_config_test.go)2
-rw-r--r--static_pool.go14
-rw-r--r--static_pool_test.go4
16 files changed, 146 insertions, 203 deletions
diff --git a/_____/http/server.go b/_____/http/server.go
deleted file mode 100644
index db1f22ef..00000000
--- a/_____/http/server.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package http
-
-import (
- "errors"
- "github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner"
- "net/http"
- "strconv"
-)
-
-// service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
-// parsed files and query, payload will include parsed form dataTree (if any).
-type Server struct {
- cfg *Config
- static *staticServer
- rr *roadrunner.Server
-}
-
-// NewServer returns new instance of HTTP PSR7 server.
-func NewServer(cfg *Config, server *roadrunner.Server) *Server {
- h := &Server{cfg: cfg, rr: server}
-
- if cfg.ServeStatic {
- h.static = &staticServer{root: http.Dir(h.cfg.Root)}
- }
-
- return h
-}
-
-// ServeHTTP serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- if srv.cfg.ServeStatic && srv.static.serve(w, r) {
- return
- }
-
- // validating request size
- if srv.cfg.MaxRequest != 0 {
- if length := r.Header.Get("content-length"); length != "" {
- if size, err := strconv.ParseInt(length, 10, 64); err != nil {
- srv.sendError(w, r, err)
- return
- } else if size > srv.cfg.MaxRequest {
- srv.sendError(w, r, errors.New("request body max size is exceeded"))
- return
- }
- }
- }
-
- req, err := NewRequest(r)
- if err != nil {
- srv.sendError(w, r, err)
- return
- }
-
- if err = req.Open(srv.cfg); err != nil {
- srv.sendError(w, r, err)
- return
- }
- defer req.Close()
-
- p, err := req.Payload()
- if err != nil {
- srv.sendError(w, r, err)
- return
- }
-
- rsp, err := srv.rr.Exec(p)
- if err != nil {
- srv.sendError(w, r, err)
- return
- }
-
- resp, err := NewResponse(rsp)
- if err != nil {
- srv.sendError(w, r, err)
- return
- }
-
- resp.Write(w)
-}
-
-// sendError sends error
-func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) {
- logrus.Errorf("http: %s", err)
- w.WriteHeader(500)
- w.Write([]byte(err.Error()))
-}
diff --git a/payload.go b/payload.go
index 87472fa1..4b5003e4 100644
--- a/payload.go
+++ b/payload.go
@@ -9,7 +9,7 @@ type Payload struct {
// body contains binary payload to be processed by worker.
Body []byte
- // todo: io.Reader support for streamed requests and responses.
+ // add io.Reader support for streamed requests and responses.
}
// String returns payload body as string
diff --git a/pool.go b/pool.go
index e9227889..a7721050 100644
--- a/pool.go
+++ b/pool.go
@@ -19,8 +19,8 @@ const (
// Pool managed set of inner worker processes.
type Pool interface {
- // Observe all caused events to attached watcher.
- Observe(o func(event int, ctx interface{}))
+ // Listen all caused events to attached watcher.
+ Listen(l func(event int, ctx interface{}))
// Exec one task with given payload and context, returns result or error.
Exec(rqs *Payload) (rsp *Payload, err error)
diff --git a/server.go b/server.go
index fb6927d6..84dedb52 100644
--- a/server.go
+++ b/server.go
@@ -29,7 +29,7 @@ type Server struct {
cfg *ServerConfig
// observes pool events (can be attached to multiple pools at the same time)
- observer func(event int, ctx interface{})
+ listener func(event int, ctx interface{})
// protects pool while the re-configuration
mu sync.Mutex
@@ -49,9 +49,9 @@ func NewServer(cfg *ServerConfig) *Server {
return &Server{cfg: cfg}
}
-// Observe attaches server event watcher.
-func (srv *Server) Observe(o func(event int, ctx interface{})) {
- srv.observer = o
+// Listen attaches server event watcher.
+func (srv *Server) Listen(l func(event int, ctx interface{})) {
+ srv.listener = l
}
// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory
@@ -80,7 +80,7 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error {
srv.mu.Lock()
srv.cfg.Pool, srv.pool = cfg.Pool, pool
- srv.pool.Observe(srv.poolObserver)
+ srv.pool.Listen(srv.poolListener)
srv.mu.Unlock()
srv.throw(EventPoolConstruct, pool)
@@ -113,7 +113,7 @@ func (srv *Server) Start() (err error) {
return err
}
- srv.pool.Observe(srv.poolObserver)
+ srv.pool.Listen(srv.poolListener)
srv.started = true
srv.throw(EventServerStart, srv)
@@ -169,9 +169,9 @@ func (srv *Server) Pool() Pool {
return srv.pool
}
-// Observe pool events.
-func (srv *Server) poolObserver(event int, ctx interface{}) {
- // bypassing to user specified observer
+// Listen pool events.
+func (srv *Server) poolListener(event int, ctx interface{}) {
+ // bypassing to user specified listener
srv.throw(event, ctx)
if event == EventPoolError {
@@ -192,7 +192,7 @@ func (srv *Server) poolObserver(event int, ctx interface{}) {
// throw invokes event handler if any.
func (srv *Server) throw(event int, ctx interface{}) {
- if srv.observer != nil {
- srv.observer(event, ctx)
+ if srv.listener != nil {
+ srv.listener(event, ctx)
}
}
diff --git a/server_test.go b/server_test.go
index 3a19a5c5..9e2367e4 100644
--- a/server_test.go
+++ b/server_test.go
@@ -179,7 +179,7 @@ func TestServer_ReplacePool(t *testing.T) {
assert.NoError(t, srv.Start())
constructed := make(chan interface{})
- srv.Observe(func(e int, ctx interface{}) {
+ srv.Listen(func(e int, ctx interface{}) {
if e == EventPoolConstruct {
close(constructed)
}
@@ -208,7 +208,7 @@ func TestServer_ServerFailure(t *testing.T) {
assert.NoError(t, srv.Start())
failure := make(chan interface{})
- srv.Observe(func(e int, ctx interface{}) {
+ srv.Listen(func(e int, ctx interface{}) {
if e == EventServerFailure {
close(failure)
}
diff --git a/service/http/config.go b/service/http/config.go
index b828eb08..efcaae30 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -17,7 +17,7 @@ type Config struct {
MaxRequest int64
// Uploads configures uploads configuration.
- Uploads *FsConfig
+ Uploads *UploadsConfig
// Workers configures roadrunner server and worker pool.
Workers *roadrunner.ServerConfig
diff --git a/service/http/handler.go b/service/http/handler.go
deleted file mode 100644
index 1319200c..00000000
--- a/service/http/handler.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package http
-
-import (
- "net/http"
- "strconv"
- "github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner"
- "github.com/pkg/errors"
-)
-
-// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
-// parsed files and query, payload will include parsed form dataTree (if any).
-type Handler struct {
- cfg *Config
- rr *roadrunner.Server
-}
-
-// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (h *Handler) Handle(w http.ResponseWriter, r *http.Request) {
- // validating request size
- if h.cfg.MaxRequest != 0 {
- if length := r.Header.Get("content-length"); length != "" {
- if size, err := strconv.ParseInt(length, 10, 64); err != nil {
- h.sendError(w, r, err)
- return
- } else if size > h.cfg.MaxRequest {
- h.sendError(w, r, errors.New("request body max size is exceeded"))
- return
- }
- }
- }
-
- req, err := NewRequest(r)
- if err != nil {
- h.sendError(w, r, err)
- return
- }
-
- if err = req.Open(h.cfg); err != nil {
- h.sendError(w, r, err)
- return
- }
- defer req.Close()
-
- p, err := req.Payload()
- if err != nil {
- h.sendError(w, r, err)
- return
- }
-
- rsp, err := h.rr.Exec(p)
- if err != nil {
- h.sendError(w, r, err)
- return
- }
-
- resp, err := NewResponse(rsp)
- if err != nil {
- h.sendError(w, r, err)
- return
- }
-
- resp.Write(w)
-}
-
-// sendError sends error
-func (h *Handler) sendError(w http.ResponseWriter, r *http.Request, err error) {
- logrus.Errorf("http: %s", err)
- w.WriteHeader(500)
- w.Write([]byte(err.Error()))
-}
diff --git a/service/http/parse.go b/service/http/parse.go
index 898f39a1..fe8361d6 100644
--- a/service/http/parse.go
+++ b/service/http/parse.go
@@ -69,7 +69,7 @@ func (d dataTree) mount(i []string, v []string) {
}
// parse incoming dataTree request into JSON (including multipart form dataTree)
-func parseUploads(r *http.Request, cfg *FsConfig) (*Uploads, error) {
+func parseUploads(r *http.Request, cfg *UploadsConfig) (*Uploads, error) {
u := &Uploads{
cfg: cfg,
tree: make(fileTree),
diff --git a/service/http/request.go b/service/http/request.go
index fd483744..c7304c8d 100644
--- a/service/http/request.go
+++ b/service/http/request.go
@@ -44,7 +44,7 @@ type Request struct {
}
// NewRequest creates new PSR7 compatible request using net/http request.
-func NewRequest(r *http.Request) (req *Request, err error) {
+func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
req = &Request{
Protocol: r.Proto,
Method: r.Method,
@@ -67,11 +67,11 @@ func NewRequest(r *http.Request) (req *Request, err error) {
return nil, err
}
- if req.body, err = parsePost(r); err != nil {
+ if req.body, err = parseData(r); err != nil {
return nil, err
}
- if req.Uploads, err = parseUploads(r); err != nil {
+ if req.Uploads, err = parseUploads(r, cfg); err != nil {
return nil, err
}
@@ -80,12 +80,12 @@ func NewRequest(r *http.Request) (req *Request, err error) {
}
// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open(cfg *Config) error {
+func (r *Request) Open() error {
if r.Uploads == nil {
return nil
}
- return r.Uploads.Open(cfg)
+ return r.Uploads.Open()
}
// Close clears all temp file uploads
diff --git a/service/http/server.go b/service/http/server.go
new file mode 100644
index 00000000..178980a7
--- /dev/null
+++ b/service/http/server.go
@@ -0,0 +1,84 @@
+package http
+
+import (
+ "net/http"
+ "strconv"
+ "github.com/spiral/roadrunner"
+ "github.com/pkg/errors"
+)
+
+// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Server struct {
+ cfg *Config
+ listener func(event int, ctx interface{})
+ rr *roadrunner.Server
+}
+
+// Listen attaches pool event watcher.
+func (s *Server) Listen(l func(event int, ctx interface{})) {
+ s.listener = l
+}
+
+// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ // validating request size
+ if s.cfg.MaxRequest != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ s.handleError(w, r, err)
+ return
+ } else if size > s.cfg.MaxRequest {
+ s.handleError(w, r, errors.New("request body max size is exceeded"))
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, s.cfg.Uploads)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ if err = req.Open(); err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+ defer req.Close()
+
+ p, err := req.Payload()
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ rsp, err := s.rr.Exec(p)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ resp.Write(w)
+}
+
+// handleError sends error
+func (s *Server) handleError(w http.ResponseWriter, r *http.Request, err error) {
+ s.throw(2332323, err)
+
+ w.WriteHeader(500)
+ w.Write([]byte(err.Error()))
+}
+
+// throw invokes event srv if any.
+func (s *Server) throw(event int, ctx interface{}) {
+ if s.listener != nil {
+ s.listener(event, ctx)
+ }
+}
diff --git a/service/http/service.go b/service/http/service.go
index 5a0d4c16..c31c4a47 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -17,10 +17,11 @@ type Middleware interface {
// Service manages rr, http servers.
type Service struct {
- middleware []Middleware
cfg *Config
+ listener func(event int, ctx interface{})
+ middleware []Middleware
rr *roadrunner.Server
- handler *Handler
+ srv *Server
http *http.Server
}
@@ -28,6 +29,11 @@ func (s *Service) Add(m Middleware) {
s.middleware = append(s.middleware, m)
}
+// Listen attaches server event watcher.
+func (s *Service) Listen(o func(event int, ctx interface{})) {
+ s.listener = o
+}
+
// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) {
@@ -56,11 +62,18 @@ func (s *Service) Serve() error {
}
defer s.rr.Stop()
- // todo: observer
-
s.rr = rr
- s.handler = &Handler{cfg: s.cfg, rr: s.rr}
- s.http = &http.Server{Addr: s.cfg.httpAddr(), Handler: s}
+ s.srv = &Server{cfg: s.cfg, rr: s.rr}
+ s.http = &http.Server{Addr: s.cfg.httpAddr()}
+
+ s.rr.Listen(s.listener)
+ s.srv.Listen(s.listener)
+
+ if len(s.middleware) == 0 {
+ s.http.Handler = s.srv
+ } else {
+ s.http.Handler = s
+ }
if err := s.http.ListenAndServe(); err != nil {
return err
@@ -71,6 +84,10 @@ func (s *Service) Serve() error {
// Stop stops the service.
func (s *Service) Stop() {
+ if s.http == nil {
+ return
+ }
+
s.http.Shutdown(context.Background())
}
@@ -82,5 +99,5 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
- s.handler.Handle(w, r)
+ s.srv.ServeHTTP(w, r)
}
diff --git a/service/http/uploads.go b/service/http/uploads.go
index cdd3e52c..62167a6c 100644
--- a/service/http/uploads.go
+++ b/service/http/uploads.go
@@ -29,7 +29,7 @@ const (
// tree manages uploaded files tree and temporary files.
type Uploads struct {
// associated temp directory and forbidden extensions.
- cfg *FsConfig
+ cfg *UploadsConfig
// pre processed data tree for Uploads.
tree fileTree
@@ -99,7 +99,7 @@ func NewUpload(f *multipart.FileHeader) *FileUpload {
}
}
-func (f *FileUpload) Open(cfg *FsConfig) error {
+func (f *FileUpload) Open(cfg *UploadsConfig) error {
if cfg.Forbids(f.Name) {
f.Error = UploadErrorExtension
return nil
diff --git a/service/http/fs_config.go b/service/http/uploads_config.go
index de5b1389..ac80723f 100644
--- a/service/http/fs_config.go
+++ b/service/http/uploads_config.go
@@ -5,8 +5,8 @@ import (
"path"
)
-// FsConfig describes file location and controls access to them.
-type FsConfig struct {
+// UploadsConfig describes file location and controls access to them.
+type UploadsConfig struct {
// Dir contains name of directory to control access to.
Dir string
@@ -16,7 +16,7 @@ type FsConfig struct {
}
// Forbid must return true if file extension is not allowed for the upload.
-func (cfg FsConfig) Forbids(filename string) bool {
+func (cfg UploadsConfig) Forbids(filename string) bool {
ext := strings.ToLower(path.Ext(filename))
for _, v := range cfg.Forbid {
diff --git a/service/http/fs_config_test.go b/service/http/uploads_config_test.go
index 05f568e5..e2de97f2 100644
--- a/service/http/fs_config_test.go
+++ b/service/http/uploads_config_test.go
@@ -6,7 +6,7 @@ import (
)
func TestFsConfig_Forbids(t *testing.T) {
- cfg := FsConfig{Forbid: []string{".php"}}
+ cfg := UploadsConfig{Forbid: []string{".php"}}
assert.True(t, cfg.Forbids("index.php"))
assert.True(t, cfg.Forbids("index.PHP"))
diff --git a/static_pool.go b/static_pool.go
index f4652bfc..a4391005 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -21,8 +21,8 @@ type StaticPool struct {
// worker command creator
cmd func() *exec.Cmd
- // observer is optional callback to handle worker create/destruct/error events.
- observer func(event int, ctx interface{})
+ // listener is optional callback to handle worker create/destruct/error events.
+ listener func(event int, ctx interface{})
// creates and connects to workers
factory Factory
@@ -69,9 +69,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Observe attaches pool event watcher.
-func (p *StaticPool) Observe(o func(event int, ctx interface{})) {
- p.observer = o
+// Listen attaches pool event watcher.
+func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
+ p.listener = l
}
// Config returns associated pool configuration. Immutable.
@@ -253,7 +253,7 @@ func (p *StaticPool) destroyWorker(w *Worker) {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
- if p.observer != nil {
- p.observer(event, ctx)
+ if p.listener != nil {
+ p.listener(event, ctx)
}
}
diff --git a/static_pool_test.go b/static_pool_test.go
index 97872609..d9fa75cd 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- p.Observe(func(e int, ctx interface{}) {
+ p.Listen(func(e int, ctx interface{}) {
if err, ok := ctx.(error); ok {
assert.Contains(t, err.Error(), "undefined_function()")
}
@@ -184,7 +184,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
destructed := make(chan interface{})
- p.Observe(func(e int, ctx interface{}) {
+ p.Listen(func(e int, ctx interface{}) {
if e == EventWorkerConstruct {
close(destructed)
}