summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
Diffstat (limited to 'service')
-rw-r--r--service/http/config.go28
-rw-r--r--service/http/parse.go148
-rw-r--r--service/http/request.go137
-rw-r--r--service/http/response.go54
-rw-r--r--service/http/rpc.go61
-rw-r--r--service/http/server.go113
-rw-r--r--service/http/service.go121
-rw-r--r--service/http/uploads.go130
-rw-r--r--service/http/uploads_config.go39
-rw-r--r--service/http/uploads_config_test.go24
-rw-r--r--service/rpc/config.go35
-rw-r--r--service/rpc/config_test.go109
-rw-r--r--service/rpc/service.go122
-rw-r--r--service/rpc/service_test.go95
-rw-r--r--service/static/config.go52
-rw-r--r--service/static/config_test.go21
-rw-r--r--service/static/service.go88
17 files changed, 1377 insertions, 0 deletions
diff --git a/service/http/config.go b/service/http/config.go
new file mode 100644
index 00000000..d92b4c60
--- /dev/null
+++ b/service/http/config.go
@@ -0,0 +1,28 @@
+package http
+
+import (
+ "github.com/spiral/roadrunner"
+)
+
+// Configures RoadRunner HTTP server.
+type Config struct {
+ // Enable enables http svc.
+ Enable bool
+
+ // Address and port to handle as http server.
+ Address string
+
+ // MaxRequest specified max size for payload body in bytes, set 0 to unlimited.
+ MaxRequest int64
+
+ // Uploads configures uploads configuration.
+ Uploads *UploadsConfig
+
+ // Workers configures roadrunner server and worker pool.
+ Workers *roadrunner.ServerConfig
+}
+
+// Valid validates the configuration.
+func (cfg *Config) Valid() error {
+ return nil
+}
diff --git a/service/http/parse.go b/service/http/parse.go
new file mode 100644
index 00000000..01030831
--- /dev/null
+++ b/service/http/parse.go
@@ -0,0 +1,148 @@
+package http
+
+import (
+ "strings"
+ "net/http"
+ "os"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) (dataTree, error) {
+ data := make(dataTree)
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+
+ return data, nil
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ if len(v) == 0 {
+ // skip empty values
+ return
+ }
+
+ indexes := make([]string, 0)
+ for _, index := range strings.Split(k, "[") {
+ indexes = append(indexes, strings.Trim(index, "]"))
+ }
+
+ if len(indexes) <= MaxLevel {
+ d.mount(indexes, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(v) == 0 {
+ return
+ }
+
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including multipart form dataTree)
+func parseUploads(r *http.Request, cfg *UploadsConfig) (*Uploads, error) {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u, nil
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true
+ }
+
+ if os.IsNotExist(err) {
+ return false
+ }
+
+ return false
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ if len(v) == 0 {
+ // skip empty values
+ return
+ }
+
+ indexes := make([]string, 0)
+ for _, index := range strings.Split(k, "[") {
+ indexes = append(indexes, strings.Trim(index, "]"))
+ }
+
+ if len(indexes) <= MaxLevel {
+ d.mount(indexes, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(v) == 0 {
+ return
+ }
+
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
diff --git a/service/http/request.go b/service/http/request.go
new file mode 100644
index 00000000..c7304c8d
--- /dev/null
+++ b/service/http/request.go
@@ -0,0 +1,137 @@
+package http
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "io/ioutil"
+ "net/http"
+ "strings"
+)
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // Uri contains full request Uri with scheme and query.
+ Uri string `json:"uri"`
+
+ // Headers contains list of request headers.
+ Headers http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
+ req = &Request{
+ Protocol: r.Proto,
+ Method: r.Method,
+ Uri: uri(r),
+ Headers: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ }
+
+ for _, c := range r.Cookies() {
+ req.Cookies[c.Name] = c.Value
+ }
+
+ if !req.parsable() {
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+ }
+
+ if err = r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ if req.body, err = parseData(r); err != nil {
+ return nil, err
+ }
+
+ if req.Uploads, err = parseUploads(r, cfg); err != nil {
+ return nil, err
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open() error {
+ if r.Uploads == nil {
+ return nil
+ }
+
+ return r.Uploads.Open()
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close() {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear()
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (p *roadrunner.Payload, err error) {
+ p = &roadrunner.Payload{}
+
+ if p.Context, err = json.Marshal(r); err != nil {
+ return nil, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return nil, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// parsable returns true if request payload can be parsed (POST dataTree, file tree).
+func (r *Request) parsable() bool {
+ if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" {
+ return false
+ }
+
+ ct := r.Headers.Get("content-type")
+ return strings.Contains(ct, "multipart/form-data") || ct == "application/x-www-form-urlencoded"
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/service/http/response.go b/service/http/response.go
new file mode 100644
index 00000000..dd092353
--- /dev/null
+++ b/service/http/response.go
@@ -0,0 +1,54 @@
+package http
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner"
+ "net/http"
+ "io"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Headers contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated body payload.
+ body interface{}
+}
+
+// NewResponse creates new response based on given roadrunner payload.
+func NewResponse(p *roadrunner.Payload) (*Response, error) {
+ r := &Response{body: p.Body}
+ if err := json.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ for k, v := range r.Headers {
+ for _, h := range v {
+ w.Header().Add(k, h)
+
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.body.([]byte); ok {
+ w.Write(data)
+ }
+
+ if rc, ok := r.body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/service/http/rpc.go b/service/http/rpc.go
new file mode 100644
index 00000000..2adb8706
--- /dev/null
+++ b/service/http/rpc.go
@@ -0,0 +1,61 @@
+package http
+
+import (
+ "github.com/pkg/errors"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []Worker `json:"workers"`
+}
+
+// Worker provides information about specific worker.
+type Worker struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumJobs int64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // Updated is unix nano timestamp of last worker execution.
+ Updated int64 `json:"updated"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, r *string) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ *r = "OK"
+ return rpc.svc.srv.rr.Reset()
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ for _, w := range rpc.svc.rr.Workers() {
+ state := w.State()
+ r.Workers = append(r.Workers, Worker{
+ Pid: *w.Pid,
+ Status: state.String(),
+ NumJobs: state.NumExecs(),
+ Created: w.Created.UnixNano(),
+ Updated: state.Updated().UnixNano(),
+ })
+ }
+
+ return nil
+}
diff --git a/service/http/server.go b/service/http/server.go
new file mode 100644
index 00000000..de414b08
--- /dev/null
+++ b/service/http/server.go
@@ -0,0 +1,113 @@
+package http
+
+import (
+ "net/http"
+ "strconv"
+ "github.com/spiral/roadrunner"
+ "github.com/pkg/errors"
+)
+
+const (
+ // EventResponse thrown after the request been processed. See Log as payload.
+ EventResponse = iota + 500
+
+ // EventError thrown on any non job error provided by road runner server.
+ EventError
+)
+
+// Log represents singular http response event.
+type Log struct {
+ // Method of the request.
+ Method string
+
+ // Uri requested by the client.
+ Uri string
+
+ // Status is response status.
+ Status int
+
+ // Associated error, if any.
+ Error error
+}
+
+// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Server struct {
+ cfg *Config
+ listener func(event int, ctx interface{})
+ rr *roadrunner.Server
+}
+
+// AddListener attaches pool event watcher.
+func (s *Server) Listen(l func(event int, ctx interface{})) {
+ s.listener = l
+}
+
+// Handle serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ // validating request size
+ if s.cfg.MaxRequest != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ s.handleError(w, r, err)
+ return
+ } else if size > s.cfg.MaxRequest {
+ s.handleError(w, r, errors.New("request body max size is exceeded"))
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, s.cfg.Uploads)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ if err = req.Open(); err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+ defer req.Close()
+
+ p, err := req.Payload()
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ rsp, err := s.rr.Exec(p)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ s.handleError(w, r, err)
+ return
+ }
+
+ s.handleResponse(req, resp)
+ resp.Write(w)
+}
+
+// handleResponse triggers response event.
+func (s *Server) handleResponse(req *Request, resp *Response) {
+ s.throw(EventResponse, &Log{Method: req.Method, Uri: req.Uri, Status: resp.Status})
+}
+
+// handleError sends error.
+func (s *Server) handleError(w http.ResponseWriter, r *http.Request, err error) {
+ s.throw(EventError, &Log{Method: r.Method, Uri: uri(r), Status: 500, Error: err})
+
+ w.WriteHeader(500)
+ w.Write([]byte(err.Error()))
+}
+
+// throw invokes event srv if any.
+func (s *Server) throw(event int, ctx interface{}) {
+ if s.listener != nil {
+ s.listener(event, ctx)
+ }
+}
diff --git a/service/http/service.go b/service/http/service.go
new file mode 100644
index 00000000..ba941c7c
--- /dev/null
+++ b/service/http/service.go
@@ -0,0 +1,121 @@
+package http
+
+import (
+ "net/http"
+ "github.com/spiral/roadrunner/service"
+ "context"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/rpc"
+)
+
+// Name contains default svc name.
+const Name = "http"
+
+type Middleware interface {
+ // Handle must return true if request/response pair is handled withing the middleware.
+ Handle(w http.ResponseWriter, r *http.Request) bool
+}
+
+// Service manages rr, http servers.
+type Service struct {
+ cfg *Config
+ listeners []func(event int, ctx interface{})
+ middleware []Middleware
+ rr *roadrunner.Server
+ srv *Server
+ http *http.Server
+}
+
+func (s *Service) AddMiddleware(m Middleware) {
+ s.middleware = append(s.middleware, m)
+}
+
+// AddListener attaches server event watcher.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.listeners = append(s.listeners, l)
+}
+
+// Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) {
+ config := &Config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+
+ // registering http RPC interface
+ if r, ok := c.Get(rpc.Name); ok >= service.StatusConfigured {
+ if h, ok := r.(*rpc.Service); ok {
+ h.Register(Name, &rpcServer{s})
+ }
+ }
+
+ return true, nil
+}
+
+// Serve serves the svc.
+func (s *Service) Serve() error {
+ rr := roadrunner.NewServer(s.cfg.Workers)
+
+ s.rr = rr
+ s.srv = &Server{cfg: s.cfg, rr: s.rr}
+ s.http = &http.Server{Addr: s.cfg.Address}
+
+ s.rr.Listen(s.listener)
+ s.srv.Listen(s.listener)
+
+ if len(s.middleware) == 0 {
+ s.http.Handler = s.srv
+ } else {
+ s.http.Handler = s
+ }
+
+ if err := rr.Start(); err != nil {
+ return err
+ }
+ defer s.rr.Stop()
+
+ if err := s.http.ListenAndServe(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Stop stops the svc.
+func (s *Service) Stop() {
+ if s.http == nil {
+ return
+ }
+
+ s.http.Shutdown(context.Background())
+}
+
+// Handle handles connection using set of middleware and rr PSR-7 server.
+func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ for _, m := range s.middleware {
+ if m.Handle(w, r) {
+ return
+ }
+ }
+
+ s.srv.ServeHTTP(w, r)
+}
+
+func (s *Service) listener(event int, ctx interface{}) {
+ // todo: DIE on server failure
+
+ for _, l := range s.listeners {
+ l(event, ctx)
+ }
+}
diff --git a/service/http/uploads.go b/service/http/uploads.go
new file mode 100644
index 00000000..1f3060c0
--- /dev/null
+++ b/service/http/uploads.go
@@ -0,0 +1,130 @@
+package http
+
+import (
+ "encoding/json"
+ "os"
+ "sync"
+ "mime/multipart"
+ "io/ioutil"
+ "io"
+)
+
+const (
+ // There is no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // No file was uploaded.
+ UploadErrorNoFile = 4
+
+ // Missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // Failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // Forbid file extension.
+ UploadErrorExtension = 7
+)
+
+// tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg *UploadsConfig
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ return json.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open() error {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ f.Open(u.cfg)
+ }(f)
+ }
+
+ wg.Wait()
+ return nil
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear() {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ os.Remove(f.TempFilename)
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // Name contains filename specified by the client.
+ Name string `json:"name"`
+
+ // MimeType contains mime-type provided by the client.
+ MimeType string `json:"type"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ MimeType: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+func (f *FileUpload) Open(cfg *UploadsConfig) error {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+ defer file.Close()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer tmp.Close()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
diff --git a/service/http/uploads_config.go b/service/http/uploads_config.go
new file mode 100644
index 00000000..715de69a
--- /dev/null
+++ b/service/http/uploads_config.go
@@ -0,0 +1,39 @@
+package http
+
+import (
+ "strings"
+ "path"
+ "os"
+)
+
+// UploadsConfig describes file location and controls access to them.
+type UploadsConfig struct {
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// TmpDir returns temporary directory.
+func (cfg *UploadsConfig) TmpDir() string {
+ if cfg.Dir != "" {
+ return cfg.Dir
+ }
+
+ return os.TempDir()
+}
+
+// Forbid must return true if file extension is not allowed for the upload.
+func (cfg *UploadsConfig) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/service/http/uploads_config_test.go b/service/http/uploads_config_test.go
new file mode 100644
index 00000000..7704a486
--- /dev/null
+++ b/service/http/uploads_config_test.go
@@ -0,0 +1,24 @@
+package http
+
+import (
+ "testing"
+ "github.com/stretchr/testify/assert"
+ "os"
+)
+
+func TestFsConfig_Forbids(t *testing.T) {
+ cfg := UploadsConfig{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestFsConfig_TmpFallback(t *testing.T) {
+ cfg := UploadsConfig{Dir: "test"}
+ assert.Equal(t, "test", cfg.TmpDir())
+
+ cfg = UploadsConfig{Dir: ""}
+ assert.Equal(t, os.TempDir(), cfg.TmpDir())
+}
diff --git a/service/rpc/config.go b/service/rpc/config.go
new file mode 100644
index 00000000..06d63d65
--- /dev/null
+++ b/service/rpc/config.go
@@ -0,0 +1,35 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+)
+
+type config struct {
+ // Indicates if RPC connection is enabled.
+ Enable bool
+
+ // AddListener string
+ Listen string
+}
+
+// listener creates new rpc socket listener.
+func (cfg *config) listener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+// dialer creates rpc socket dialer.
+func (cfg *config) dialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
new file mode 100644
index 00000000..a953e30e
--- /dev/null
+++ b/service/rpc/config_test.go
@@ -0,0 +1,109 @@
+package rpc
+
+import (
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+)
+
+func TestConfig_Listener(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "tcp", ln.Addr().Network())
+ assert.Equal(t, "[::]:18001", ln.Addr().String())
+}
+
+func TestConfig_ListenerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "unix", ln.Addr().Network())
+ assert.Equal(t, "rpc.sock", ln.Addr().String())
+}
+
+func Test_Config_Error(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_ErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func TestConfig_Dialer(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "tcp", conn.RemoteAddr().Network())
+ assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
+}
+
+func TestConfig_DialerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "unix", conn.RemoteAddr().Network())
+ assert.Equal(t, "rpc.sock", conn.RemoteAddr().String())
+}
+
+func Test_Config_DialerError(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_DialerErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
diff --git a/service/rpc/service.go b/service/rpc/service.go
new file mode 100644
index 00000000..ce1e3351
--- /dev/null
+++ b/service/rpc/service.go
@@ -0,0 +1,122 @@
+package rpc
+
+import (
+ "errors"
+ "github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
+ "net/rpc"
+ "sync"
+)
+
+// Name contains default service name.
+const Name = "rpc"
+
+// Service is RPC service.
+type Service struct {
+ cfg *config
+ stop chan interface{}
+ rpc *rpc.Server
+
+ mu sync.Mutex
+ serving bool
+}
+
+// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) {
+ config := &config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ s.cfg = config
+ s.rpc = rpc.NewServer()
+
+ return true, nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ s.mu.Lock()
+ s.serving = true
+ s.stop = make(chan interface{})
+ s.mu.Unlock()
+
+ ln, err := s.cfg.listener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ go func() {
+ for {
+ select {
+ case <-s.stop:
+ break
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ <-s.stop
+
+ s.mu.Lock()
+ s.serving = false
+ s.mu.Unlock()
+
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.serving {
+ close(s.stop)
+ }
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, rcvr interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, rcvr)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.cfg == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.cfg.dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
new file mode 100644
index 00000000..a57ce1bd
--- /dev/null
+++ b/service/rpc/service_test.go
@@ -0,0 +1,95 @@
+package rpc
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type testService struct{}
+
+func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) service.Config { return nil }
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_ConfigError(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":false`}, nil)
+
+ assert.Error(t, err)
+ assert.False(t, ok)
+}
+
+func Test_Disabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":false}`}, nil)
+
+ assert.NoError(t, err)
+ assert.False(t, ok)
+}
+
+func Test_RegisterNotConfigured(t *testing.T) {
+ s := &Service{}
+ assert.Error(t, s.Register("test", &testService{}))
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+ assert.Error(t, s.Serve())
+}
+
+func Test_Enabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+}
+
+func Test_StopNonServing(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+ s.Stop()
+}
+
+func Test_Serve_Errors(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ assert.Error(t, s.Serve())
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+}
+
+func Test_Serve_Client(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ defer s.Stop()
+
+ assert.NoError(t, s.Register("test", &testService{}))
+
+ go func() { assert.NoError(t, s.Serve()) }()
+
+ client, err := s.Client()
+ assert.NotNil(t, client)
+ assert.NoError(t, err)
+ defer client.Close()
+
+ var resp string
+ assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
+ assert.Equal(t, "hello world", resp)
+}
diff --git a/service/static/config.go b/service/static/config.go
new file mode 100644
index 00000000..2a1f6c13
--- /dev/null
+++ b/service/static/config.go
@@ -0,0 +1,52 @@
+package static
+
+import (
+ "strings"
+ "path"
+ "os"
+ "github.com/pkg/errors"
+)
+
+// Config describes file location and controls access to them.
+type Config struct {
+ // Enables StaticFile service.
+ Enable bool
+
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// Forbid must return true if file extension is not allowed for the upload.
+func (cfg *Config) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Valid validates existence of directory.
+func (cfg *Config) Valid() error {
+ st, err := os.Stat(cfg.Dir)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return errors.New("root directory does not exists")
+ }
+
+ return err
+ }
+
+ if !st.IsDir() {
+ return errors.New("invalid root directory")
+ }
+
+ return nil
+}
diff --git a/service/static/config_test.go b/service/static/config_test.go
new file mode 100644
index 00000000..ce31348a
--- /dev/null
+++ b/service/static/config_test.go
@@ -0,0 +1,21 @@
+package static
+
+import (
+ "testing"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestConfig_Forbids(t *testing.T) {
+ cfg := Config{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestConfig_Valid(t *testing.T) {
+ assert.NoError(t, (&Config{Dir: "./"}).Valid())
+ assert.Error(t, (&Config{Dir: "./config.go"}).Valid())
+ assert.Error(t, (&Config{Dir: "./dir/"}).Valid())
+}
diff --git a/service/static/service.go b/service/static/service.go
new file mode 100644
index 00000000..5c3defe6
--- /dev/null
+++ b/service/static/service.go
@@ -0,0 +1,88 @@
+package static
+
+import (
+ "net/http"
+ "path"
+ "strings"
+ rrttp "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner/service"
+)
+
+// Name contains default service name.
+const Name = "static"
+
+// Service serves static files. Potentially convert into middleware?
+type Service struct {
+ // server configuration (location, forbidden files and etc)
+ cfg *Config
+
+ // root is initiated http directory
+ root http.Dir
+}
+
+// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bool, err error) {
+ config := &Config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+ s.root = http.Dir(s.cfg.Dir)
+
+ // registering as middleware
+ if h, ok := c.Get(rrttp.Name); ok >= service.StatusConfigured {
+ if h, ok := h.(*rrttp.Service); ok {
+ h.AddMiddleware(s)
+ }
+ }
+
+ return true, nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() error { return nil }
+
+// Stop stops the service.
+func (s *Service) Stop() {}
+
+// Handle must return true if request/response pair is handled withing the middleware.
+func (s *Service) Handle(w http.ResponseWriter, r *http.Request) bool {
+ fPath := r.URL.Path
+ if !strings.HasPrefix(fPath, "/") {
+ fPath = "/" + fPath
+ }
+ fPath = path.Clean(fPath)
+
+ if s.cfg.Forbids(fPath) {
+ return false
+ }
+
+ f, err := s.root.Open(fPath)
+ if err != nil {
+ return false
+ }
+ defer f.Close()
+
+ d, err := f.Stat()
+ if err != nil {
+ return false
+ }
+
+ // do not Handle directories
+ if d.IsDir() {
+ return false
+ }
+
+ http.ServeContent(w, r, d.Name(), d.ModTime(), f)
+ return true
+}