diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/http/config.go | 28 | ||||
-rw-r--r-- | service/http/parse.go | 148 | ||||
-rw-r--r-- | service/http/request.go | 137 | ||||
-rw-r--r-- | service/http/response.go | 54 | ||||
-rw-r--r-- | service/http/rpc.go | 61 | ||||
-rw-r--r-- | service/http/server.go | 113 | ||||
-rw-r--r-- | service/http/service.go | 121 | ||||
-rw-r--r-- | service/http/uploads.go | 130 | ||||
-rw-r--r-- | service/http/uploads_config.go | 39 | ||||
-rw-r--r-- | service/http/uploads_config_test.go | 24 | ||||
-rw-r--r-- | service/rpc/config.go | 35 | ||||
-rw-r--r-- | service/rpc/config_test.go | 109 | ||||
-rw-r--r-- | service/rpc/service.go | 122 | ||||
-rw-r--r-- | service/rpc/service_test.go | 95 | ||||
-rw-r--r-- | service/static/config.go | 52 | ||||
-rw-r--r-- | service/static/config_test.go | 21 | ||||
-rw-r--r-- | service/static/service.go | 88 |
17 files changed, 1377 insertions, 0 deletions
diff --git a/service/http/config.go b/service/http/config.go new file mode 100644 index 00000000..d92b4c60 --- /dev/null +++ b/service/http/config.go @@ -0,0 +1,28 @@ +package http + +import ( + "github.com/spiral/roadrunner" +) + +// Configures RoadRunner HTTP server. +type Config struct { + // Enable enables http svc. + Enable bool + + // Address and port to handle as http server. + Address string + + // MaxRequest specified max size for payload body in bytes, set 0 to unlimited. + MaxRequest int64 + + // Uploads configures uploads configuration. + Uploads *UploadsConfig + + // Workers configures roadrunner server and worker pool. + Workers *roadrunner.ServerConfig +} + +// Valid validates the configuration. +func (cfg *Config) Valid() error { + return nil +} diff --git a/service/http/parse.go b/service/http/parse.go new file mode 100644 index 00000000..01030831 --- /dev/null +++ b/service/http/parse.go @@ -0,0 +1,148 @@ +package http + +import ( + "strings" + "net/http" + "os" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) (dataTree, error) { + data := make(dataTree) + for k, v := range r.PostForm { + data.push(k, v) + } + + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + + return data, nil +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + if len(v) == 0 { + // skip empty values + return + } + + indexes := make([]string, 0) + for _, index := range strings.Split(k, "[") { + indexes = append(indexes, strings.Trim(index, "]")) + } + + if len(indexes) <= MaxLevel { + d.mount(indexes, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(v) == 0 { + return + } + + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including multipart form dataTree) +func parseUploads(r *http.Request, cfg *UploadsConfig) (*Uploads, error) { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u, nil +} + +// exists if file exists. +func exists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + + if os.IsNotExist(err) { + return false + } + + return false +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + if len(v) == 0 { + // skip empty values + return + } + + indexes := make([]string, 0) + for _, index := range strings.Split(k, "[") { + indexes = append(indexes, strings.Trim(index, "]")) + } + + if len(indexes) <= MaxLevel { + d.mount(indexes, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(v) == 0 { + return + } + + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} diff --git a/service/http/request.go b/service/http/request.go new file mode 100644 index 00000000..c7304c8d --- /dev/null +++ b/service/http/request.go @@ -0,0 +1,137 @@ +package http + +import ( + "encoding/json" + "fmt" + "github.com/spiral/roadrunner" + "io/ioutil" + "net/http" + "strings" +) + +const ( + defaultMaxMemory = 32 << 20 // 32 MB +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // Uri contains full request Uri with scheme and query. + Uri string `json:"uri"` + + // Headers contains list of request headers. + Headers http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // request body can be parsedData or []byte + body interface{} +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { + req = &Request{ + Protocol: r.Proto, + Method: r.Method, + Uri: uri(r), + Headers: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + } + + for _, c := range r.Cookies() { + req.Cookies[c.Name] = c.Value + } + + if !req.parsable() { + req.body, err = ioutil.ReadAll(r.Body) + return req, err + } + + if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + if req.body, err = parseData(r); err != nil { + return nil, err + } + + if req.Uploads, err = parseUploads(r, cfg); err != nil { + return nil, err + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open() error { + if r.Uploads == nil { + return nil + } + + return r.Uploads.Open() +} + +// Close clears all temp file uploads +func (r *Request) Close() { + if r.Uploads == nil { + return + } + + r.Uploads.Clear() +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (p *roadrunner.Payload, err error) { + p = &roadrunner.Payload{} + + if p.Context, err = json.Marshal(r); err != nil { + return nil, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return nil, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// parsable returns true if request payload can be parsed (POST dataTree, file tree). +func (r *Request) parsable() bool { + if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" { + return false + } + + ct := r.Headers.Get("content-type") + return strings.Contains(ct, "multipart/form-data") || ct == "application/x-www-form-urlencoded" +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/service/http/response.go b/service/http/response.go new file mode 100644 index 00000000..dd092353 --- /dev/null +++ b/service/http/response.go @@ -0,0 +1,54 @@ +package http + +import ( + "encoding/json" + "github.com/spiral/roadrunner" + "net/http" + "io" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Headers contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated body payload. + body interface{} +} + +// NewResponse creates new response based on given roadrunner payload. +func NewResponse(p *roadrunner.Payload) (*Response, error) { + r := &Response{body: p.Body} + if err := json.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + for k, v := range r.Headers { + for _, h := range v { + w.Header().Add(k, h) + + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.body.([]byte); ok { + w.Write(data) + } + + if rc, ok := r.body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} diff --git a/service/http/rpc.go b/service/http/rpc.go new file mode 100644 index 00000000..2adb8706 --- /dev/null +++ b/service/http/rpc.go @@ -0,0 +1,61 @@ +package http + +import ( + "github.com/pkg/errors" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []Worker `json:"workers"` +} + +// Worker provides information about specific worker. +type Worker struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumJobs int64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` + + // Updated is unix nano timestamp of last worker execution. + Updated int64 `json:"updated"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + *r = "OK" + return rpc.svc.srv.rr.Reset() +} + +// Workers returns list of active workers and their stats. +func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + for _, w := range rpc.svc.rr.Workers() { + state := w.State() + r.Workers = append(r.Workers, Worker{ + Pid: *w.Pid, + Status: state.String(), + NumJobs: state.NumExecs(), + Created: w.Created.UnixNano(), + Updated: state.Updated().UnixNano(), + }) + } + + return nil +} diff --git a/service/http/server.go b/service/http/server.go new file mode 100644 index 00000000..de414b08 --- /dev/null +++ b/service/http/server.go @@ -0,0 +1,113 @@ +package http + +import ( + "net/http" + "strconv" + "github.com/spiral/roadrunner" + "github.com/pkg/errors" +) + +const ( + // EventResponse thrown after the request been processed. See Log as payload. + EventResponse = iota + 500 + + // EventError thrown on any non job error provided by road runner server. + EventError +) + +// Log represents singular http response event. +type Log struct { + // Method of the request. + Method string + + // Uri requested by the client. + Uri string + + // Status is response status. + Status int + + // Associated error, if any. + Error error +} + +// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Server struct { + cfg *Config + listener func(event int, ctx interface{}) + rr *roadrunner.Server +} + +// AddListener attaches pool event watcher. +func (s *Server) Listen(l func(event int, ctx interface{})) { + s.listener = l +} + +// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // validating request size + if s.cfg.MaxRequest != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + s.handleError(w, r, err) + return + } else if size > s.cfg.MaxRequest { + s.handleError(w, r, errors.New("request body max size is exceeded")) + return + } + } + } + + req, err := NewRequest(r, s.cfg.Uploads) + if err != nil { + s.handleError(w, r, err) + return + } + + if err = req.Open(); err != nil { + s.handleError(w, r, err) + return + } + defer req.Close() + + p, err := req.Payload() + if err != nil { + s.handleError(w, r, err) + return + } + + rsp, err := s.rr.Exec(p) + if err != nil { + s.handleError(w, r, err) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + s.handleError(w, r, err) + return + } + + s.handleResponse(req, resp) + resp.Write(w) +} + +// handleResponse triggers response event. +func (s *Server) handleResponse(req *Request, resp *Response) { + s.throw(EventResponse, &Log{Method: req.Method, Uri: req.Uri, Status: resp.Status}) +} + +// handleError sends error. +func (s *Server) handleError(w http.ResponseWriter, r *http.Request, err error) { + s.throw(EventError, &Log{Method: r.Method, Uri: uri(r), Status: 500, Error: err}) + + w.WriteHeader(500) + w.Write([]byte(err.Error())) +} + +// throw invokes event srv if any. +func (s *Server) throw(event int, ctx interface{}) { + if s.listener != nil { + s.listener(event, ctx) + } +} diff --git a/service/http/service.go b/service/http/service.go new file mode 100644 index 00000000..ba941c7c --- /dev/null +++ b/service/http/service.go @@ -0,0 +1,121 @@ +package http + +import ( + "net/http" + "github.com/spiral/roadrunner/service" + "context" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/rpc" +) + +// Name contains default svc name. +const Name = "http" + +type Middleware interface { + // Handle must return true if request/response pair is handled withing the middleware. + Handle(w http.ResponseWriter, r *http.Request) bool +} + +// Service manages rr, http servers. +type Service struct { + cfg *Config + listeners []func(event int, ctx interface{}) + middleware []Middleware + rr *roadrunner.Server + srv *Server + http *http.Server +} + +func (s *Service) AddMiddleware(m Middleware) { + s.middleware = append(s.middleware, m) +} + +// AddListener attaches server event watcher. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.listeners = append(s.listeners, l) +} + +// Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) { + config := &Config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + + // registering http RPC interface + if r, ok := c.Get(rpc.Name); ok >= service.StatusConfigured { + if h, ok := r.(*rpc.Service); ok { + h.Register(Name, &rpcServer{s}) + } + } + + return true, nil +} + +// Serve serves the svc. +func (s *Service) Serve() error { + rr := roadrunner.NewServer(s.cfg.Workers) + + s.rr = rr + s.srv = &Server{cfg: s.cfg, rr: s.rr} + s.http = &http.Server{Addr: s.cfg.Address} + + s.rr.Listen(s.listener) + s.srv.Listen(s.listener) + + if len(s.middleware) == 0 { + s.http.Handler = s.srv + } else { + s.http.Handler = s + } + + if err := rr.Start(); err != nil { + return err + } + defer s.rr.Stop() + + if err := s.http.ListenAndServe(); err != nil { + return err + } + + return nil +} + +// Stop stops the svc. +func (s *Service) Stop() { + if s.http == nil { + return + } + + s.http.Shutdown(context.Background()) +} + +// Handle handles connection using set of middleware and rr PSR-7 server. +func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + for _, m := range s.middleware { + if m.Handle(w, r) { + return + } + } + + s.srv.ServeHTTP(w, r) +} + +func (s *Service) listener(event int, ctx interface{}) { + // todo: DIE on server failure + + for _, l := range s.listeners { + l(event, ctx) + } +} diff --git a/service/http/uploads.go b/service/http/uploads.go new file mode 100644 index 00000000..1f3060c0 --- /dev/null +++ b/service/http/uploads.go @@ -0,0 +1,130 @@ +package http + +import ( + "encoding/json" + "os" + "sync" + "mime/multipart" + "io/ioutil" + "io" +) + +const ( + // There is no error, the file uploaded with success. + UploadErrorOK = 0 + + // No file was uploaded. + UploadErrorNoFile = 4 + + // Missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // Failed to write file to disk. + UploadErrorCantWrite = 6 + + // Forbid file extension. + UploadErrorExtension = 7 +) + +// tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg *UploadsConfig + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + return json.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open() error { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + f.Open(u.cfg) + }(f) + } + + wg.Wait() + return nil +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear() { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + os.Remove(f.TempFilename) + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // Name contains filename specified by the client. + Name string `json:"name"` + + // MimeType contains mime-type provided by the client. + MimeType string `json:"type"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + MimeType: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +func (f *FileUpload) Open(cfg *UploadsConfig) error { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + defer file.Close() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer tmp.Close() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} diff --git a/service/http/uploads_config.go b/service/http/uploads_config.go new file mode 100644 index 00000000..715de69a --- /dev/null +++ b/service/http/uploads_config.go @@ -0,0 +1,39 @@ +package http + +import ( + "strings" + "path" + "os" +) + +// UploadsConfig describes file location and controls access to them. +type UploadsConfig struct { + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// TmpDir returns temporary directory. +func (cfg *UploadsConfig) TmpDir() string { + if cfg.Dir != "" { + return cfg.Dir + } + + return os.TempDir() +} + +// Forbid must return true if file extension is not allowed for the upload. +func (cfg *UploadsConfig) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/service/http/uploads_config_test.go b/service/http/uploads_config_test.go new file mode 100644 index 00000000..7704a486 --- /dev/null +++ b/service/http/uploads_config_test.go @@ -0,0 +1,24 @@ +package http + +import ( + "testing" + "github.com/stretchr/testify/assert" + "os" +) + +func TestFsConfig_Forbids(t *testing.T) { + cfg := UploadsConfig{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestFsConfig_TmpFallback(t *testing.T) { + cfg := UploadsConfig{Dir: "test"} + assert.Equal(t, "test", cfg.TmpDir()) + + cfg = UploadsConfig{Dir: ""} + assert.Equal(t, os.TempDir(), cfg.TmpDir()) +} diff --git a/service/rpc/config.go b/service/rpc/config.go new file mode 100644 index 00000000..06d63d65 --- /dev/null +++ b/service/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // AddListener string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go new file mode 100644 index 00000000..a953e30e --- /dev/null +++ b/service/rpc/config_test.go @@ -0,0 +1,109 @@ +package rpc + +import ( + "github.com/stretchr/testify/assert" + "runtime" + "testing" +) + +func TestConfig_Listener(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "tcp", ln.Addr().Network()) + assert.Equal(t, "[::]:18001", ln.Addr().String()) +} + +func TestConfig_ListenerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "unix", ln.Addr().Network()) + assert.Equal(t, "rpc.sock", ln.Addr().String()) +} + +func Test_Config_Error(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_ErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func TestConfig_Dialer(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "tcp", conn.RemoteAddr().Network()) + assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) +} + +func TestConfig_DialerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "unix", conn.RemoteAddr().Network()) + assert.Equal(t, "rpc.sock", conn.RemoteAddr().String()) +} + +func Test_Config_DialerError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_DialerErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) +} diff --git a/service/rpc/service.go b/service/rpc/service.go new file mode 100644 index 00000000..ce1e3351 --- /dev/null +++ b/service/rpc/service.go @@ -0,0 +1,122 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" + "sync" +) + +// Name contains default service name. +const Name = "rpc" + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server + + mu sync.Mutex + serving bool +} + +// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + s.cfg = config + s.rpc = rpc.NewServer() + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.mu.Lock() + s.serving = true + s.stop = make(chan interface{}) + s.mu.Unlock() + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + go func() { + for { + select { + case <-s.stop: + break + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + <-s.stop + + s.mu.Lock() + s.serving = false + s.mu.Unlock() + + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.serving { + close(s.stop) + } +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go new file mode 100644 index 00000000..a57ce1bd --- /dev/null +++ b/service/rpc/service_test.go @@ -0,0 +1,95 @@ +package rpc + +import ( + "encoding/json" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type testService struct{} + +func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) service.Config { return nil } +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_ConfigError(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":false`}, nil) + + assert.Error(t, err) + assert.False(t, ok) +} + +func Test_Disabled(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil) + + assert.NoError(t, err) + assert.False(t, ok) +} + +func Test_RegisterNotConfigured(t *testing.T) { + s := &Service{} + assert.Error(t, s.Register("test", &testService{})) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) + assert.Error(t, s.Serve()) +} + +func Test_Enabled(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) +} + +func Test_StopNonServing(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) + s.Stop() +} + +func Test_Serve_Errors(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + assert.Error(t, s.Serve()) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) +} + +func Test_Serve_Client(t *testing.T) { + s := &Service{} + ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + defer s.Stop() + + assert.NoError(t, s.Register("test", &testService{})) + + go func() { assert.NoError(t, s.Serve()) }() + + client, err := s.Client() + assert.NotNil(t, client) + assert.NoError(t, err) + defer client.Close() + + var resp string + assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) + assert.Equal(t, "hello world", resp) +} diff --git a/service/static/config.go b/service/static/config.go new file mode 100644 index 00000000..2a1f6c13 --- /dev/null +++ b/service/static/config.go @@ -0,0 +1,52 @@ +package static + +import ( + "strings" + "path" + "os" + "github.com/pkg/errors" +) + +// Config describes file location and controls access to them. +type Config struct { + // Enables StaticFile service. + Enable bool + + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// Forbid must return true if file extension is not allowed for the upload. +func (cfg *Config) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} + +// Valid validates existence of directory. +func (cfg *Config) Valid() error { + st, err := os.Stat(cfg.Dir) + if err != nil { + if os.IsNotExist(err) { + return errors.New("root directory does not exists") + } + + return err + } + + if !st.IsDir() { + return errors.New("invalid root directory") + } + + return nil +} diff --git a/service/static/config_test.go b/service/static/config_test.go new file mode 100644 index 00000000..ce31348a --- /dev/null +++ b/service/static/config_test.go @@ -0,0 +1,21 @@ +package static + +import ( + "testing" + "github.com/stretchr/testify/assert" +) + +func TestConfig_Forbids(t *testing.T) { + cfg := Config{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestConfig_Valid(t *testing.T) { + assert.NoError(t, (&Config{Dir: "./"}).Valid()) + assert.Error(t, (&Config{Dir: "./config.go"}).Valid()) + assert.Error(t, (&Config{Dir: "./dir/"}).Valid()) +} diff --git a/service/static/service.go b/service/static/service.go new file mode 100644 index 00000000..5c3defe6 --- /dev/null +++ b/service/static/service.go @@ -0,0 +1,88 @@ +package static + +import ( + "net/http" + "path" + "strings" + rrttp "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/service" +) + +// Name contains default service name. +const Name = "static" + +// Service serves static files. Potentially convert into middleware? +type Service struct { + // server configuration (location, forbidden files and etc) + cfg *Config + + // root is initiated http directory + root http.Dir +} + +// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bool, err error) { + config := &Config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + s.root = http.Dir(s.cfg.Dir) + + // registering as middleware + if h, ok := c.Get(rrttp.Name); ok >= service.StatusConfigured { + if h, ok := h.(*rrttp.Service); ok { + h.AddMiddleware(s) + } + } + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { return nil } + +// Stop stops the service. +func (s *Service) Stop() {} + +// Handle must return true if request/response pair is handled withing the middleware. +func (s *Service) Handle(w http.ResponseWriter, r *http.Request) bool { + fPath := r.URL.Path + if !strings.HasPrefix(fPath, "/") { + fPath = "/" + fPath + } + fPath = path.Clean(fPath) + + if s.cfg.Forbids(fPath) { + return false + } + + f, err := s.root.Open(fPath) + if err != nil { + return false + } + defer f.Close() + + d, err := f.Stat() + if err != nil { + return false + } + + // do not Handle directories + if d.IsDir() { + return false + } + + http.ServeContent(w, r, d.Name(), d.ModTime(), f) + return true +} |