summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-05 16:23:14 +0300
committerWolfy-J <[email protected]>2018-06-05 16:23:14 +0300
commit76ff8d1c95e087749d559ee5a4f8f0348feafffa (patch)
tree112630d2d2cfe41d809065034c13b1066b8e05c2 /cmd
parent3c86132f90ef6473b4073a8b1500d01b6114fc30 (diff)
Cs and refactoring
Diffstat (limited to 'cmd')
-rw-r--r--cmd/_____/bus.go94
-rw-r--r--cmd/_____/factory.go71
-rw-r--r--cmd/_____/http/config.go85
-rw-r--r--cmd/_____/http/data.go67
-rw-r--r--cmd/_____/http/request.go137
-rw-r--r--cmd/_____/http/response.go42
-rw-r--r--cmd/_____/http/rpc.go44
-rw-r--r--cmd/_____/http/server.go87
-rw-r--r--cmd/_____/http/service.go80
-rw-r--r--cmd/_____/http/static.go70
-rw-r--r--cmd/_____/http/uploads.go207
-rw-r--r--cmd/_____/utils/size.go28
-rw-r--r--cmd/_____/utils/workers.go37
-rw-r--r--cmd/_____/verbose.go (renamed from cmd/rr/utils/verbose.go)2
-rw-r--r--cmd/rr/.rr.yaml3
-rw-r--r--cmd/rr/cmd/root.go28
-rw-r--r--cmd/rr/http/register.go23
-rw-r--r--cmd/rr/http/reload.go28
-rw-r--r--cmd/rr/http/workers.go59
-rw-r--r--cmd/rr/main.go10
-rw-r--r--cmd/rr/utils/config.go11
21 files changed, 1138 insertions, 75 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go
new file mode 100644
index 00000000..813a6c3b
--- /dev/null
+++ b/cmd/_____/bus.go
@@ -0,0 +1,94 @@
+package _____
+
+import (
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ services []Service
+ wg sync.WaitGroup
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the Service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some candidates might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: %s", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: %s", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}
diff --git a/cmd/_____/factory.go b/cmd/_____/factory.go
new file mode 100644
index 00000000..8ecf90ca
--- /dev/null
+++ b/cmd/_____/factory.go
@@ -0,0 +1,71 @@
+package _____
+
+import (
+ "github.com/spiral/roadrunner"
+ "net"
+ "os/exec"
+ "strings"
+ "time"
+)
+
+// todo: move out
+type PoolConfig struct {
+ Command string
+ Relay string
+
+ Number uint64
+ MaxJobs uint64
+
+ Timeouts struct {
+ Construct int
+ Allocate int
+ Destroy int
+ }
+}
+
+func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) {
+ relays, terminator, err := f.relayFactory()
+ if err != nil {
+ terminator()
+ return nil, nil, err
+ }
+
+ rr := roadrunner.NewServer(f.cmd(), relays)
+ if err := rr.Configure(f.rrConfig()); err != nil {
+ return nil, nil, err
+ }
+
+ return rr, nil, nil
+}
+
+func (f *PoolConfig) rrConfig() roadrunner.Config {
+ return roadrunner.Config{
+ NumWorkers: f.Number,
+ MaxExecutions: f.MaxJobs,
+ AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate),
+ DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy),
+ }
+}
+
+func (f *PoolConfig) cmd() func() *exec.Cmd {
+ cmd := strings.Split(f.Command, " ")
+ return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) }
+}
+
+func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) {
+ if f.Relay == "pipes" || f.Relay == "pipe" {
+ return roadrunner.NewPipeFactory(), nil, nil
+ }
+
+ dsn := strings.Split(f.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, nil, dsnError
+ }
+
+ ln, err := net.Listen(dsn[0], dsn[1])
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil
+}
diff --git a/cmd/_____/http/config.go b/cmd/_____/http/config.go
new file mode 100644
index 00000000..54e39a7d
--- /dev/null
+++ b/cmd/_____/http/config.go
@@ -0,0 +1,85 @@
+package http
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/_____/utils"
+ "os"
+ "path"
+ "strings"
+)
+
+// Configures RoadRunner HTTP server.
+type Config struct {
+ // serve enables static file serving from desired root directory.
+ ServeStatic bool
+
+ // Root directory, required when serve set to true.
+ Root string
+
+ // TmpDir contains name of temporary directory to store uploaded files passed to underlying PHP process.
+ TmpDir string
+
+ // MaxRequest specified max size for payload body in bytes, set 0 to unlimited.
+ MaxRequest int64
+
+ // ForbidUploads specifies list of file extensions which are forbidden for uploads.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ ForbidUploads []string
+}
+
+// ForbidUploads must return true if file extension is not allowed for the upload.
+func (cfg Config) Forbidden(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.ForbidUploads {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
+
+type serviceConfig struct {
+ Enabled bool
+ Host string
+ Port string
+ MaxRequest string
+ Static struct {
+ Serve bool
+ Root string
+ }
+
+ Uploads struct {
+ TmpDir string
+ Forbid []string
+ }
+
+ Pool service.PoolConfig
+
+ //todo: verbose ?
+}
+
+func (cfg *serviceConfig) httpAddr() string {
+ return fmt.Sprintf("%s:%v", cfg.Host, cfg.Port)
+}
+
+func (cfg *serviceConfig) httpConfig() *Config {
+ tmpDir := cfg.Uploads.TmpDir
+ if tmpDir == "" {
+ tmpDir = os.TempDir()
+ }
+
+ return &Config{
+ ServeStatic: cfg.Static.Serve,
+ Root: cfg.Static.Root,
+ TmpDir: tmpDir,
+ MaxRequest: utils.ParseSize(cfg.MaxRequest),
+ ForbidUploads: cfg.Uploads.Forbid,
+ }
+}
+
+func (cfg *serviceConfig) Valid() error {
+ return nil
+}
diff --git a/cmd/_____/http/data.go b/cmd/_____/http/data.go
new file mode 100644
index 00000000..e6b8344f
--- /dev/null
+++ b/cmd/_____/http/data.go
@@ -0,0 +1,67 @@
+package http
+
+import (
+ "net/http"
+ "strings"
+)
+
+const maxLevel = 127
+
+type dataTree map[string]interface{}
+
+// parsePost parses incoming request body into data tree.
+func parsePost(r *http.Request) (dataTree, error) {
+ data := make(dataTree)
+
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+
+ return data, nil
+}
+
+func (d dataTree) push(k string, v []string) {
+ if len(v) == 0 {
+ // skip empty values
+ return
+ }
+
+ indexes := make([]string, 0)
+ for _, index := range strings.Split(k, "[") {
+ indexes = append(indexes, strings.Trim(index, "]"))
+ }
+
+ if len(indexes) <= maxLevel {
+ d.mount(indexes, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(v) == 0 {
+ return
+ }
+
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
diff --git a/cmd/_____/http/request.go b/cmd/_____/http/request.go
new file mode 100644
index 00000000..fd483744
--- /dev/null
+++ b/cmd/_____/http/request.go
@@ -0,0 +1,137 @@
+package http
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "io/ioutil"
+ "net/http"
+ "strings"
+)
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // Uri contains full request Uri with scheme and query.
+ Uri string `json:"uri"`
+
+ // Headers contains list of request headers.
+ Headers http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request) (req *Request, err error) {
+ req = &Request{
+ Protocol: r.Proto,
+ Method: r.Method,
+ Uri: uri(r),
+ Headers: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ }
+
+ for _, c := range r.Cookies() {
+ req.Cookies[c.Name] = c.Value
+ }
+
+ if !req.parsable() {
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+ }
+
+ if err = r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ if req.body, err = parsePost(r); err != nil {
+ return nil, err
+ }
+
+ if req.Uploads, err = parseUploads(r); err != nil {
+ return nil, err
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(cfg *Config) error {
+ if r.Uploads == nil {
+ return nil
+ }
+
+ return r.Uploads.Open(cfg)
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close() {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear()
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (p *roadrunner.Payload, err error) {
+ p = &roadrunner.Payload{}
+
+ if p.Context, err = json.Marshal(r); err != nil {
+ return nil, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return nil, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// parsable returns true if request payload can be parsed (POST dataTree, file tree).
+func (r *Request) parsable() bool {
+ if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" {
+ return false
+ }
+
+ ct := r.Headers.Get("content-type")
+ return strings.Contains(ct, "multipart/form-data") || ct == "application/x-www-form-urlencoded"
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/cmd/_____/http/response.go b/cmd/_____/http/response.go
new file mode 100644
index 00000000..2736c4ab
--- /dev/null
+++ b/cmd/_____/http/response.go
@@ -0,0 +1,42 @@
+package http
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner"
+ "net/http"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Headers contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated body payload.
+ body []byte
+}
+
+// NewResponse creates new response based on given roadrunner payload.
+func NewResponse(p *roadrunner.Payload) (*Response, error) {
+ r := &Response{body: p.Body}
+ if err := json.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) {
+ for k, v := range r.Headers {
+ for _, h := range v {
+ w.Header().Add(k, h)
+
+ }
+ }
+
+ w.WriteHeader(r.Status)
+ w.Write(r.body)
+}
diff --git a/cmd/_____/http/rpc.go b/cmd/_____/http/rpc.go
new file mode 100644
index 00000000..1bc8a06b
--- /dev/null
+++ b/cmd/_____/http/rpc.go
@@ -0,0 +1,44 @@
+package http
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/cmd/_____/utils"
+ "github.com/pkg/errors"
+)
+
+type rpcServer struct {
+ service *Service
+}
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []utils.Worker `json:"workers"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, r *string) error {
+ if rpc.service.srv == nil {
+ return errors.New("no http server")
+ }
+
+ logrus.Info("http: restarting worker pool")
+ *r = "OK"
+
+ err := rpc.service.srv.rr.Reset()
+ if err != nil {
+ logrus.Errorf("http: %s", err)
+ }
+
+ return err
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
+ if rpc.service.srv == nil {
+ return errors.New("no http server")
+ }
+
+ r.Workers = utils.FetchWorkers(rpc.service.srv.rr)
+ return nil
+}
diff --git a/cmd/_____/http/server.go b/cmd/_____/http/server.go
new file mode 100644
index 00000000..db1f22ef
--- /dev/null
+++ b/cmd/_____/http/server.go
@@ -0,0 +1,87 @@
+package http
+
+import (
+ "errors"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+ "net/http"
+ "strconv"
+)
+
+// service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Server struct {
+ cfg *Config
+ static *staticServer
+ rr *roadrunner.Server
+}
+
+// NewServer returns new instance of HTTP PSR7 server.
+func NewServer(cfg *Config, server *roadrunner.Server) *Server {
+ h := &Server{cfg: cfg, rr: server}
+
+ if cfg.ServeStatic {
+ h.static = &staticServer{root: http.Dir(h.cfg.Root)}
+ }
+
+ return h
+}
+
+// ServeHTTP serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if srv.cfg.ServeStatic && srv.static.serve(w, r) {
+ return
+ }
+
+ // validating request size
+ if srv.cfg.MaxRequest != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ srv.sendError(w, r, err)
+ return
+ } else if size > srv.cfg.MaxRequest {
+ srv.sendError(w, r, errors.New("request body max size is exceeded"))
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r)
+ if err != nil {
+ srv.sendError(w, r, err)
+ return
+ }
+
+ if err = req.Open(srv.cfg); err != nil {
+ srv.sendError(w, r, err)
+ return
+ }
+ defer req.Close()
+
+ p, err := req.Payload()
+ if err != nil {
+ srv.sendError(w, r, err)
+ return
+ }
+
+ rsp, err := srv.rr.Exec(p)
+ if err != nil {
+ srv.sendError(w, r, err)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ srv.sendError(w, r, err)
+ return
+ }
+
+ resp.Write(w)
+}
+
+// sendError sends error
+func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) {
+ logrus.Errorf("http: %s", err)
+ w.WriteHeader(500)
+ w.Write([]byte(err.Error()))
+}
diff --git a/cmd/_____/http/service.go b/cmd/_____/http/service.go
new file mode 100644
index 00000000..008aeab8
--- /dev/null
+++ b/cmd/_____/http/service.go
@@ -0,0 +1,80 @@
+package http
+
+import (
+ "context"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service"
+ "net/http"
+ "github.com/spiral/roadrunner"
+)
+
+const ServiceName = "http"
+
+type Service struct {
+ cfg *serviceConfig
+ http *http.Server
+ srv *Server
+}
+
+func (s *Service) Name() string {
+ return ServiceName
+}
+
+func (s *Service) Configure(cfg service.Config) (bool, error) {
+ config := &serviceConfig{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enabled {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+ return true, nil
+}
+
+func (s *Service) RPC() interface{} {
+ return &rpcServer{s}
+}
+
+func (s *Service) Serve() error {
+ logrus.Debugf("http: started")
+ defer logrus.Debugf("http: stopped")
+
+ rr, term, err := s.cfg.Pool.NewServer()
+ if err != nil {
+ return err
+ }
+ defer term()
+
+ //todo: remove
+ rr.Observe(func(event int, ctx interface{}) {
+ switch event {
+ case roadrunner.EventPoolError:
+ logrus.Error(ctx)
+ case roadrunner.EventWorkerError:
+ logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error())
+ }
+ })
+
+ s.srv = NewServer(s.cfg.httpConfig(), rr)
+ s.http = &http.Server{
+ Addr: s.cfg.httpAddr(),
+ Handler: s.srv,
+ }
+
+ if err := s.http.ListenAndServe(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *Service) Stop() error {
+ return s.http.Shutdown(context.Background())
+}
diff --git a/cmd/_____/http/static.go b/cmd/_____/http/static.go
new file mode 100644
index 00000000..d7030c3f
--- /dev/null
+++ b/cmd/_____/http/static.go
@@ -0,0 +1,70 @@
+package http
+
+import (
+ "github.com/sirupsen/logrus"
+ "net/http"
+ "os"
+ "path"
+ "path/filepath"
+ "strings"
+)
+
+var (
+ forbiddenFiles = []string{".php", ".htaccess"}
+)
+
+// staticServer serves static files
+type staticServer struct {
+ root http.Dir
+}
+
+// serve attempts to serve static file and returns true in case of success, will return false in case if file not
+// found, not allowed or on read error.
+func (svr *staticServer) serve(w http.ResponseWriter, r *http.Request) bool {
+ fpath := r.URL.Path
+ if !strings.HasPrefix(fpath, "/") {
+ fpath = "/" + fpath
+ }
+ fpath = path.Clean(fpath)
+
+ if svr.forbidden(fpath) {
+ logrus.Warningf("attempt to access forbidden file %s", fpath) // todo: better logs
+ return false
+ }
+
+ f, err := svr.root.Open(fpath)
+ if err != nil {
+ if !os.IsNotExist(err) {
+ logrus.Error(err) //todo: rr or access error
+ }
+
+ return false
+ }
+ defer f.Close()
+
+ d, err := f.Stat()
+ if err != nil {
+ logrus.Error(err) //todo: rr or access error
+ return false
+ }
+
+ if d.IsDir() {
+ // do not serve directories
+ return false
+ }
+
+ http.ServeContent(w, r, d.Name(), d.ModTime(), f)
+ return true
+}
+
+// forbidden returns true if file has forbidden extension.
+func (svr *staticServer) forbidden(path string) bool {
+ ext := strings.ToLower(filepath.Ext(path))
+ for _, exl := range forbiddenFiles {
+ if ext == exl {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/cmd/_____/http/uploads.go b/cmd/_____/http/uploads.go
new file mode 100644
index 00000000..468e8a19
--- /dev/null
+++ b/cmd/_____/http/uploads.go
@@ -0,0 +1,207 @@
+package http
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+)
+
+const (
+ // There is no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // No file was uploaded.
+ UploadErrorNoFile = 4
+
+ // Missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // Failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // ForbidUploads file extension.
+ UploadErrorExtension = 7
+)
+
+// FileUpload represents singular file wrapUpload.
+type FileUpload struct {
+ // Name contains filename specified by the client.
+ Name string `json:"name"`
+
+ // MimeType contains mime-type provided by the client.
+ MimeType string `json:"type"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+func (f *FileUpload) Open(cfg *Config) error {
+ if cfg.Forbidden(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+ defer file.Close()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir, "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer tmp.Close()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+func wrapUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ MimeType: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+type fileTree map[string]interface{}
+
+func (d fileTree) push(k string, v []*FileUpload) {
+ if len(v) == 0 {
+ // skip empty values
+ return
+ }
+
+ indexes := make([]string, 0)
+ for _, index := range strings.Split(k, "[") {
+ indexes = append(indexes, strings.Trim(index, "]"))
+ }
+
+ if len(indexes) <= maxLevel {
+ d.mount(indexes, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(v) == 0 {
+ return
+ }
+
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ return json.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually. @todo: do we need it?
+func (u *Uploads) Open(cfg *Config) error {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ f.Open(cfg)
+ }(f)
+ }
+
+ wg.Wait()
+ return nil
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear() {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ os.Remove(f.TempFilename)
+ }
+ }
+}
+
+// parse incoming dataTree request into JSON (including multipart form dataTree)
+func parseUploads(r *http.Request) (*Uploads, error) {
+ u := &Uploads{
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, wrapUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u, nil
+}
+
+// exists if file exists. by osutils; todo: better?
+func exists(path string) bool {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true
+ }
+
+ if os.IsNotExist(err) {
+ return false
+ }
+
+ panic(fmt.Errorf("unable to stat path %q; %v", path, err))
+}
diff --git a/cmd/_____/utils/size.go b/cmd/_____/utils/size.go
new file mode 100644
index 00000000..176cc9e1
--- /dev/null
+++ b/cmd/_____/utils/size.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+ "strconv"
+ "strings"
+)
+
+func ParseSize(size string) int64 {
+ if len(size) == 0 {
+ return 0
+ }
+
+ s, err := strconv.Atoi(size[:len(size)-1])
+ if err != nil {
+ return 0
+ }
+
+ switch strings.ToLower(size[len(size)-1:]) {
+ case "k", "kb":
+ return int64(s * 1024)
+ case "m", "mb":
+ return int64(s * 1024 * 1024)
+ case "g", "gb":
+ return int64(s * 1024 * 1024 * 1024)
+ }
+
+ return 0
+}
diff --git a/cmd/_____/utils/workers.go b/cmd/_____/utils/workers.go
new file mode 100644
index 00000000..1024b4c6
--- /dev/null
+++ b/cmd/_____/utils/workers.go
@@ -0,0 +1,37 @@
+package utils
+
+import "github.com/spiral/roadrunner"
+
+// Worker provides information about specific worker.
+type Worker struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumExecs uint64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // Updated is unix nano timestamp of last worker execution.
+ Updated int64 `json:"updated"`
+}
+
+// FetchWorkers fetches list of workers from RR Server.
+func FetchWorkers(srv *roadrunner.Server) (result []Worker) {
+ for _, w := range srv.Workers() {
+ state := w.State()
+ result = append(result, Worker{
+ Pid: *w.Pid,
+ Status: state.String(),
+ NumExecs: state.NumExecs(),
+ Created: w.Created.UnixNano(),
+ Updated: state.Updated().UnixNano(),
+ })
+ }
+
+ return
+} \ No newline at end of file
diff --git a/cmd/rr/utils/verbose.go b/cmd/_____/verbose.go
index 43770f34..d0088b69 100644
--- a/cmd/rr/utils/verbose.go
+++ b/cmd/_____/verbose.go
@@ -1,4 +1,4 @@
-package utils
+package _____
//if f.Verbose {
// rr.Observe(func(event int, ctx interface{}) {
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index 2717e187..6e3b689b 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -1,5 +1,8 @@
# rpc bus allows php application and external clients to talk to rr services.
rpc:
+ # enable rpc server
+ enable: true
+
# rpc connection DSN. Supported TCP and Unix sockets.
listen: tcp://127.0.0.1:6001
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index d0cac5ef..d54437f0 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -31,24 +31,29 @@ import (
// Service bus for all the commands.
var (
- // Shared service bus.
- Services = service.NewBus()
+ cfgFile string
+ verbose bool
+
+ // Logger - shared logger.
+ Logger = logrus.New()
+
+ // Services - shared service bus.
+ Services = service.NewRegistry(Logger)
// CLI is application endpoint.
CLI = &cobra.Command{
- Use: "rr",
- Short: "RoadRunner, PHP application server",
+ Use: "rr",
+ SilenceErrors: true,
+ SilenceUsage: true,
+ Short: utils.Sprintf("<green>RoadRunner, PHP Application Server.</reset>"),
}
-
- cfgFile string
- verbose bool
)
// Execute adds all child commands to the CLI command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the CLI.
func Execute() {
if err := CLI.Execute(); err != nil {
- logrus.Error(err)
+ utils.Printf("Error: <red>%s</reset>\n", err)
os.Exit(1)
}
}
@@ -59,7 +64,7 @@ func init() {
cobra.OnInitialize(func() {
if verbose {
- logrus.SetLevel(logrus.DebugLevel)
+ Logger.SetLevel(logrus.DebugLevel)
}
if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
@@ -81,6 +86,7 @@ func initConfig(cfgFile string, path []string, name string) service.Config {
for _, p := range path {
cfg.AddConfigPath(p)
}
+
cfg.SetConfigName(name)
}
@@ -89,9 +95,9 @@ func initConfig(cfgFile string, path []string, name string) service.Config {
// If a cfg file is found, read it in.
if err := cfg.ReadInConfig(); err != nil {
- logrus.Warnf("config: %s", err)
+ Logger.Warnf("config: %s", err)
return nil
}
- return &utils.ConfigWrapper{cfg}
+ return &utils.ViperWrapper{Viper: cfg}
}
diff --git a/cmd/rr/http/register.go b/cmd/rr/http/register.go
deleted file mode 100644
index fb828578..00000000
--- a/cmd/rr/http/register.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package http
-
-import (
- "github.com/spf13/cobra"
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/http"
-)
-
-func init() {
- rr.Services.Register(&http.Service{})
-
- rr.CLI.AddCommand(&cobra.Command{
- Use: "http:reload",
- Short: "Reload RoadRunner worker pools for the HTTP service",
- Run: reloadHandler,
- })
-
- rr.CLI.AddCommand(&cobra.Command{
- Use: "http:workers",
- Short: "List workers associated with RoadRunner HTTP service",
- Run: workersHandler,
- })
-}
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go
index 6cdba576..0fd3d7e9 100644
--- a/cmd/rr/http/reload.go
+++ b/cmd/rr/http/reload.go
@@ -23,22 +23,34 @@ package http
import (
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/sirupsen/logrus"
+ "github.com/go-errors/errors"
+ "github.com/spiral/roadrunner/rpc"
)
-func reloadHandler(cmd *cobra.Command, args []string) {
- client, err := rr.Services.RCPClient()
+func init() {
+ rr.CLI.AddCommand(&cobra.Command{
+ Use: "http:reload",
+ Short: "Reload RoadRunner worker pools for the HTTP service",
+ RunE: reloadHandler,
+ })
+}
+
+func reloadHandler(cmd *cobra.Command, args []string) error {
+ if !rr.Services.Has("rpc") {
+ return errors.New("RPC service is not configured")
+ }
+
+ client, err := rr.Services.Get("rpc").(*rpc.Service).Client()
if err != nil {
- logrus.Error(err)
- return
+ return err
}
defer client.Close()
var r string
if err := client.Call("http.Reset", true, &r); err != nil {
- logrus.Error(err)
- return
+ return err
}
- logrus.Info("restarting http worker pool")
+ rr.Logger.Info("http.service: restarting worker pool")
+ return nil
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index 13e8d21c..63ef0cce 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -21,38 +21,47 @@
package http
import (
- "github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/http"
- "os"
- "strconv"
- "github.com/sirupsen/logrus"
+ "errors"
+ "github.com/spiral/roadrunner/rpc"
)
-func workersHandler(cmd *cobra.Command, args []string) {
- client, err := rr.Services.RCPClient()
- if err != nil {
- logrus.Error(err)
- return
- }
- defer client.Close()
+func init() {
+ rr.CLI.AddCommand(&cobra.Command{
+ Use: "http:workers",
+ Short: "List workers associated with RoadRunner HTTP service",
+ RunE: workersHandler,
+ })
+}
- var r http.WorkerList
- if err := client.Call("http.Workers", true, &r); err != nil {
- panic(err)
+func workersHandler(cmd *cobra.Command, args []string) error {
+ if !rr.Services.Has("rpc") {
+ return errors.New("RPC service is not configured")
}
- tw := tablewriter.NewWriter(os.Stdout)
- tw.SetHeader([]string{"PID", "Status", "Num Execs"})
-
- for _, w := range r.Workers {
- tw.Append([]string{
- strconv.Itoa(w.Pid),
- w.Status,
- strconv.Itoa(int(w.NumExecs)),
- })
+ client, err := rr.Services.Get("rpc").(*rpc.Service).Client()
+ if err != nil {
+ return err
}
+ defer client.Close()
- tw.Render()
+ //var r http.WorkerList
+ //if err := client.Call("http.Workers", true, &r); err != nil {
+ // panic(err)
+ //}
+ //
+ //tw := tablewriter.NewWriter(os.Stdout)
+ //tw.SetHeader([]string{"PID", "Status", "Num Execs"})
+ //
+ //for _, w := range r.Workers {
+ // tw.Append([]string{
+ // strconv.Itoa(w.Pid),
+ // w.Status,
+ // strconv.Itoa(int(w.NumExecs)),
+ // })
+ //}
+ //
+ //tw.Render()
+ return nil
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 9d6f685c..26f70fdd 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -23,13 +23,17 @@
package main
import (
- "github.com/spiral/roadrunner/cmd/rr/cmd"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/rpc"
- // service plugins
+ // cli plugins
_ "github.com/spiral/roadrunner/cmd/rr/http"
)
func main() {
+ // provides ability to make local connection to services
+ rr.Services.Register("rpc", new(rpc.Service))
+
// you can register additional commands using cmd.CLI
- cmd.Execute()
+ rr.Execute()
}
diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go
index e7e22b3a..452dd195 100644
--- a/cmd/rr/utils/config.go
+++ b/cmd/rr/utils/config.go
@@ -5,19 +5,22 @@ import (
"github.com/spiral/roadrunner/service"
)
-type ConfigWrapper struct {
+// ViperWrapper provides interface bridge between Viper configs and service.Config.
+type ViperWrapper struct {
Viper *viper.Viper
}
-func (w *ConfigWrapper) Get(key string) service.Config {
+// Get nested config section (sub-map), returns nil if section not found.
+func (w *ViperWrapper) Get(key string) service.Config {
sub := w.Viper.Sub(key)
if sub == nil {
return nil
}
- return &ConfigWrapper{sub}
+ return &ViperWrapper{sub}
}
-func (w *ConfigWrapper) Unmarshal(out interface{}) error {
+// Unmarshal unmarshal config data into given struct.
+func (w *ViperWrapper) Unmarshal(out interface{}) error {
return w.Viper.Unmarshal(out)
}