diff options
-rw-r--r-- | _____/http/server.go | 87 | ||||
-rw-r--r-- | payload.go | 2 | ||||
-rw-r--r-- | pool.go | 4 | ||||
-rw-r--r-- | server.go | 22 | ||||
-rw-r--r-- | server_test.go | 4 | ||||
-rw-r--r-- | service/http/config.go | 2 | ||||
-rw-r--r-- | service/http/handler.go | 71 | ||||
-rw-r--r-- | service/http/parse.go | 2 | ||||
-rw-r--r-- | service/http/request.go | 10 | ||||
-rw-r--r-- | service/http/server.go | 84 | ||||
-rw-r--r-- | service/http/service.go | 31 | ||||
-rw-r--r-- | service/http/uploads.go | 4 | ||||
-rw-r--r-- | service/http/uploads_config.go (renamed from service/http/fs_config.go) | 6 | ||||
-rw-r--r-- | service/http/uploads_config_test.go (renamed from service/http/fs_config_test.go) | 2 | ||||
-rw-r--r-- | static_pool.go | 14 | ||||
-rw-r--r-- | static_pool_test.go | 4 |
16 files changed, 146 insertions, 203 deletions
diff --git a/_____/http/server.go b/_____/http/server.go deleted file mode 100644 index db1f22ef..00000000 --- a/_____/http/server.go +++ /dev/null @@ -1,87 +0,0 @@ -package http - -import ( - "errors" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner" - "net/http" - "strconv" -) - -// service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type Server struct { - cfg *Config - static *staticServer - rr *roadrunner.Server -} - -// NewServer returns new instance of HTTP PSR7 server. -func NewServer(cfg *Config, server *roadrunner.Server) *Server { - h := &Server{cfg: cfg, rr: server} - - if cfg.ServeStatic { - h.static = &staticServer{root: http.Dir(h.cfg.Root)} - } - - return h -} - -// ServeHTTP serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if srv.cfg.ServeStatic && srv.static.serve(w, r) { - return - } - - // validating request size - if srv.cfg.MaxRequest != 0 { - if length := r.Header.Get("content-length"); length != "" { - if size, err := strconv.ParseInt(length, 10, 64); err != nil { - srv.sendError(w, r, err) - return - } else if size > srv.cfg.MaxRequest { - srv.sendError(w, r, errors.New("request body max size is exceeded")) - return - } - } - } - - req, err := NewRequest(r) - if err != nil { - srv.sendError(w, r, err) - return - } - - if err = req.Open(srv.cfg); err != nil { - srv.sendError(w, r, err) - return - } - defer req.Close() - - p, err := req.Payload() - if err != nil { - srv.sendError(w, r, err) - return - } - - rsp, err := srv.rr.Exec(p) - if err != nil { - srv.sendError(w, r, err) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - srv.sendError(w, r, err) - return - } - - resp.Write(w) -} - -// sendError sends error -func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) { - logrus.Errorf("http: %s", err) - w.WriteHeader(500) - w.Write([]byte(err.Error())) -} @@ -9,7 +9,7 @@ type Payload struct { // body contains binary payload to be processed by worker. Body []byte - // todo: io.Reader support for streamed requests and responses. + // add io.Reader support for streamed requests and responses. } // String returns payload body as string @@ -19,8 +19,8 @@ const ( // Pool managed set of inner worker processes. type Pool interface { - // Observe all caused events to attached watcher. - Observe(o func(event int, ctx interface{})) + // Listen all caused events to attached watcher. + Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. Exec(rqs *Payload) (rsp *Payload, err error) @@ -29,7 +29,7 @@ type Server struct { cfg *ServerConfig // observes pool events (can be attached to multiple pools at the same time) - observer func(event int, ctx interface{}) + listener func(event int, ctx interface{}) // protects pool while the re-configuration mu sync.Mutex @@ -49,9 +49,9 @@ func NewServer(cfg *ServerConfig) *Server { return &Server{cfg: cfg} } -// Observe attaches server event watcher. -func (srv *Server) Observe(o func(event int, ctx interface{})) { - srv.observer = o +// Listen attaches server event watcher. +func (srv *Server) Listen(l func(event int, ctx interface{})) { + srv.listener = l } // Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory @@ -80,7 +80,7 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error { srv.mu.Lock() srv.cfg.Pool, srv.pool = cfg.Pool, pool - srv.pool.Observe(srv.poolObserver) + srv.pool.Listen(srv.poolListener) srv.mu.Unlock() srv.throw(EventPoolConstruct, pool) @@ -113,7 +113,7 @@ func (srv *Server) Start() (err error) { return err } - srv.pool.Observe(srv.poolObserver) + srv.pool.Listen(srv.poolListener) srv.started = true srv.throw(EventServerStart, srv) @@ -169,9 +169,9 @@ func (srv *Server) Pool() Pool { return srv.pool } -// Observe pool events. -func (srv *Server) poolObserver(event int, ctx interface{}) { - // bypassing to user specified observer +// Listen pool events. +func (srv *Server) poolListener(event int, ctx interface{}) { + // bypassing to user specified listener srv.throw(event, ctx) if event == EventPoolError { @@ -192,7 +192,7 @@ func (srv *Server) poolObserver(event int, ctx interface{}) { // throw invokes event handler if any. func (srv *Server) throw(event int, ctx interface{}) { - if srv.observer != nil { - srv.observer(event, ctx) + if srv.listener != nil { + srv.listener(event, ctx) } } diff --git a/server_test.go b/server_test.go index 3a19a5c5..9e2367e4 100644 --- a/server_test.go +++ b/server_test.go @@ -179,7 +179,7 @@ func TestServer_ReplacePool(t *testing.T) { assert.NoError(t, srv.Start()) constructed := make(chan interface{}) - srv.Observe(func(e int, ctx interface{}) { + srv.Listen(func(e int, ctx interface{}) { if e == EventPoolConstruct { close(constructed) } @@ -208,7 +208,7 @@ func TestServer_ServerFailure(t *testing.T) { assert.NoError(t, srv.Start()) failure := make(chan interface{}) - srv.Observe(func(e int, ctx interface{}) { + srv.Listen(func(e int, ctx interface{}) { if e == EventServerFailure { close(failure) } diff --git a/service/http/config.go b/service/http/config.go index b828eb08..efcaae30 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -17,7 +17,7 @@ type Config struct { MaxRequest int64 // Uploads configures uploads configuration. - Uploads *FsConfig + Uploads *UploadsConfig // Workers configures roadrunner server and worker pool. Workers *roadrunner.ServerConfig diff --git a/service/http/handler.go b/service/http/handler.go deleted file mode 100644 index 1319200c..00000000 --- a/service/http/handler.go +++ /dev/null @@ -1,71 +0,0 @@ -package http - -import ( - "net/http" - "strconv" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner" - "github.com/pkg/errors" -) - -// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type Handler struct { - cfg *Config - rr *roadrunner.Server -} - -// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (h *Handler) Handle(w http.ResponseWriter, r *http.Request) { - // validating request size - if h.cfg.MaxRequest != 0 { - if length := r.Header.Get("content-length"); length != "" { - if size, err := strconv.ParseInt(length, 10, 64); err != nil { - h.sendError(w, r, err) - return - } else if size > h.cfg.MaxRequest { - h.sendError(w, r, errors.New("request body max size is exceeded")) - return - } - } - } - - req, err := NewRequest(r) - if err != nil { - h.sendError(w, r, err) - return - } - - if err = req.Open(h.cfg); err != nil { - h.sendError(w, r, err) - return - } - defer req.Close() - - p, err := req.Payload() - if err != nil { - h.sendError(w, r, err) - return - } - - rsp, err := h.rr.Exec(p) - if err != nil { - h.sendError(w, r, err) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - h.sendError(w, r, err) - return - } - - resp.Write(w) -} - -// sendError sends error -func (h *Handler) sendError(w http.ResponseWriter, r *http.Request, err error) { - logrus.Errorf("http: %s", err) - w.WriteHeader(500) - w.Write([]byte(err.Error())) -} diff --git a/service/http/parse.go b/service/http/parse.go index 898f39a1..fe8361d6 100644 --- a/service/http/parse.go +++ b/service/http/parse.go @@ -69,7 +69,7 @@ func (d dataTree) mount(i []string, v []string) { } // parse incoming dataTree request into JSON (including multipart form dataTree) -func parseUploads(r *http.Request, cfg *FsConfig) (*Uploads, error) { +func parseUploads(r *http.Request, cfg *UploadsConfig) (*Uploads, error) { u := &Uploads{ cfg: cfg, tree: make(fileTree), diff --git a/service/http/request.go b/service/http/request.go index fd483744..c7304c8d 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -44,7 +44,7 @@ type Request struct { } // NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request) (req *Request, err error) { +func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { req = &Request{ Protocol: r.Proto, Method: r.Method, @@ -67,11 +67,11 @@ func NewRequest(r *http.Request) (req *Request, err error) { return nil, err } - if req.body, err = parsePost(r); err != nil { + if req.body, err = parseData(r); err != nil { return nil, err } - if req.Uploads, err = parseUploads(r); err != nil { + if req.Uploads, err = parseUploads(r, cfg); err != nil { return nil, err } @@ -80,12 +80,12 @@ func NewRequest(r *http.Request) (req *Request, err error) { } // Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open(cfg *Config) error { +func (r *Request) Open() error { if r.Uploads == nil { return nil } - return r.Uploads.Open(cfg) + return r.Uploads.Open() } // Close clears all temp file uploads diff --git a/service/http/server.go b/service/http/server.go new file mode 100644 index 00000000..178980a7 --- /dev/null +++ b/service/http/server.go @@ -0,0 +1,84 @@ +package http + +import ( + "net/http" + "strconv" + "github.com/spiral/roadrunner" + "github.com/pkg/errors" +) + +// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Server struct { + cfg *Config + listener func(event int, ctx interface{}) + rr *roadrunner.Server +} + +// Listen attaches pool event watcher. +func (s *Server) Listen(l func(event int, ctx interface{})) { + s.listener = l +} + +// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // validating request size + if s.cfg.MaxRequest != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + s.handleError(w, r, err) + return + } else if size > s.cfg.MaxRequest { + s.handleError(w, r, errors.New("request body max size is exceeded")) + return + } + } + } + + req, err := NewRequest(r, s.cfg.Uploads) + if err != nil { + s.handleError(w, r, err) + return + } + + if err = req.Open(); err != nil { + s.handleError(w, r, err) + return + } + defer req.Close() + + p, err := req.Payload() + if err != nil { + s.handleError(w, r, err) + return + } + + rsp, err := s.rr.Exec(p) + if err != nil { + s.handleError(w, r, err) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + s.handleError(w, r, err) + return + } + + resp.Write(w) +} + +// handleError sends error +func (s *Server) handleError(w http.ResponseWriter, r *http.Request, err error) { + s.throw(2332323, err) + + w.WriteHeader(500) + w.Write([]byte(err.Error())) +} + +// throw invokes event srv if any. +func (s *Server) throw(event int, ctx interface{}) { + if s.listener != nil { + s.listener(event, ctx) + } +} diff --git a/service/http/service.go b/service/http/service.go index 5a0d4c16..c31c4a47 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -17,10 +17,11 @@ type Middleware interface { // Service manages rr, http servers. type Service struct { - middleware []Middleware cfg *Config + listener func(event int, ctx interface{}) + middleware []Middleware rr *roadrunner.Server - handler *Handler + srv *Server http *http.Server } @@ -28,6 +29,11 @@ func (s *Service) Add(m Middleware) { s.middleware = append(s.middleware, m) } +// Listen attaches server event watcher. +func (s *Service) Listen(o func(event int, ctx interface{})) { + s.listener = o +} + // Configure must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) { @@ -56,11 +62,18 @@ func (s *Service) Serve() error { } defer s.rr.Stop() - // todo: observer - s.rr = rr - s.handler = &Handler{cfg: s.cfg, rr: s.rr} - s.http = &http.Server{Addr: s.cfg.httpAddr(), Handler: s} + s.srv = &Server{cfg: s.cfg, rr: s.rr} + s.http = &http.Server{Addr: s.cfg.httpAddr()} + + s.rr.Listen(s.listener) + s.srv.Listen(s.listener) + + if len(s.middleware) == 0 { + s.http.Handler = s.srv + } else { + s.http.Handler = s + } if err := s.http.ListenAndServe(); err != nil { return err @@ -71,6 +84,10 @@ func (s *Service) Serve() error { // Stop stops the service. func (s *Service) Stop() { + if s.http == nil { + return + } + s.http.Shutdown(context.Background()) } @@ -82,5 +99,5 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - s.handler.Handle(w, r) + s.srv.ServeHTTP(w, r) } diff --git a/service/http/uploads.go b/service/http/uploads.go index cdd3e52c..62167a6c 100644 --- a/service/http/uploads.go +++ b/service/http/uploads.go @@ -29,7 +29,7 @@ const ( // tree manages uploaded files tree and temporary files. type Uploads struct { // associated temp directory and forbidden extensions. - cfg *FsConfig + cfg *UploadsConfig // pre processed data tree for Uploads. tree fileTree @@ -99,7 +99,7 @@ func NewUpload(f *multipart.FileHeader) *FileUpload { } } -func (f *FileUpload) Open(cfg *FsConfig) error { +func (f *FileUpload) Open(cfg *UploadsConfig) error { if cfg.Forbids(f.Name) { f.Error = UploadErrorExtension return nil diff --git a/service/http/fs_config.go b/service/http/uploads_config.go index de5b1389..ac80723f 100644 --- a/service/http/fs_config.go +++ b/service/http/uploads_config.go @@ -5,8 +5,8 @@ import ( "path" ) -// FsConfig describes file location and controls access to them. -type FsConfig struct { +// UploadsConfig describes file location and controls access to them. +type UploadsConfig struct { // Dir contains name of directory to control access to. Dir string @@ -16,7 +16,7 @@ type FsConfig struct { } // Forbid must return true if file extension is not allowed for the upload. -func (cfg FsConfig) Forbids(filename string) bool { +func (cfg UploadsConfig) Forbids(filename string) bool { ext := strings.ToLower(path.Ext(filename)) for _, v := range cfg.Forbid { diff --git a/service/http/fs_config_test.go b/service/http/uploads_config_test.go index 05f568e5..e2de97f2 100644 --- a/service/http/fs_config_test.go +++ b/service/http/uploads_config_test.go @@ -6,7 +6,7 @@ import ( ) func TestFsConfig_Forbids(t *testing.T) { - cfg := FsConfig{Forbid: []string{".php"}} + cfg := UploadsConfig{Forbid: []string{".php"}} assert.True(t, cfg.Forbids("index.php")) assert.True(t, cfg.Forbids("index.PHP")) diff --git a/static_pool.go b/static_pool.go index f4652bfc..a4391005 100644 --- a/static_pool.go +++ b/static_pool.go @@ -21,8 +21,8 @@ type StaticPool struct { // worker command creator cmd func() *exec.Cmd - // observer is optional callback to handle worker create/destruct/error events. - observer func(event int, ctx interface{}) + // listener is optional callback to handle worker create/destruct/error events. + listener func(event int, ctx interface{}) // creates and connects to workers factory Factory @@ -69,9 +69,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Observe attaches pool event watcher. -func (p *StaticPool) Observe(o func(event int, ctx interface{})) { - p.observer = o +// Listen attaches pool event watcher. +func (p *StaticPool) Listen(l func(event int, ctx interface{})) { + p.listener = l } // Config returns associated pool configuration. Immutable. @@ -253,7 +253,7 @@ func (p *StaticPool) destroyWorker(w *Worker) { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { - if p.observer != nil { - p.observer(event, ctx) + if p.listener != nil { + p.listener(event, ctx) } } diff --git a/static_pool_test.go b/static_pool_test.go index 97872609..d9fa75cd 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Observe(func(e int, ctx interface{}) { + p.Listen(func(e int, ctx interface{}) { if err, ok := ctx.(error); ok { assert.Contains(t, err.Error(), "undefined_function()") } @@ -184,7 +184,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, runtime.NumCPU(), len(p.Workers())) destructed := make(chan interface{}) - p.Observe(func(e int, ctx interface{}) { + p.Listen(func(e int, ctx interface{}) { if e == EventWorkerConstruct { close(destructed) } |