From 76ff8d1c95e087749d559ee5a4f8f0348feafffa Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Tue, 5 Jun 2018 16:23:14 +0300 Subject: Cs and refactoring --- cmd/_____/bus.go | 94 ++++++++++++++++++++ cmd/_____/factory.go | 71 ++++++++++++++++ cmd/_____/http/config.go | 85 +++++++++++++++++++ cmd/_____/http/data.go | 67 +++++++++++++++ cmd/_____/http/request.go | 137 ++++++++++++++++++++++++++++++ cmd/_____/http/response.go | 42 +++++++++ cmd/_____/http/rpc.go | 44 ++++++++++ cmd/_____/http/server.go | 87 +++++++++++++++++++ cmd/_____/http/service.go | 80 ++++++++++++++++++ cmd/_____/http/static.go | 70 +++++++++++++++ cmd/_____/http/uploads.go | 207 +++++++++++++++++++++++++++++++++++++++++++++ cmd/_____/utils/size.go | 28 ++++++ cmd/_____/utils/workers.go | 37 ++++++++ cmd/_____/verbose.go | 18 ++++ cmd/rr/.rr.yaml | 3 + cmd/rr/cmd/root.go | 28 +++--- cmd/rr/http/register.go | 23 ----- cmd/rr/http/reload.go | 28 ++++-- cmd/rr/http/workers.go | 59 +++++++------ cmd/rr/main.go | 10 ++- cmd/rr/utils/config.go | 11 ++- cmd/rr/utils/verbose.go | 18 ---- http/config.go | 85 ------------------- http/data.go | 67 --------------- http/request.go | 137 ------------------------------ http/response.go | 42 --------- http/rpc.go | 44 ---------- http/server.go | 87 ------------------- http/service.go | 78 ----------------- http/static.go | 70 --------------- http/uploads.go | 207 --------------------------------------------- rpc/config.go | 35 ++++++++ rpc/service.go | 99 ++++++++++++++++++++++ service/bus.go | 165 ------------------------------------ service/config.go | 6 -- service/factory.go | 69 --------------- service/registry.go | 162 +++++++++++++++++++++++++++++++++++ service/rpc.go | 32 ------- service/service.go | 9 -- utils/size.go | 28 ------ utils/workers.go | 37 -------- 41 files changed, 1451 insertions(+), 1255 deletions(-) create mode 100644 cmd/_____/bus.go create mode 100644 cmd/_____/factory.go create mode 100644 cmd/_____/http/config.go create mode 100644 cmd/_____/http/data.go create mode 100644 cmd/_____/http/request.go create mode 100644 cmd/_____/http/response.go create mode 100644 cmd/_____/http/rpc.go create mode 100644 cmd/_____/http/server.go create mode 100644 cmd/_____/http/service.go create mode 100644 cmd/_____/http/static.go create mode 100644 cmd/_____/http/uploads.go create mode 100644 cmd/_____/utils/size.go create mode 100644 cmd/_____/utils/workers.go create mode 100644 cmd/_____/verbose.go delete mode 100644 cmd/rr/http/register.go delete mode 100644 cmd/rr/utils/verbose.go delete mode 100644 http/config.go delete mode 100644 http/data.go delete mode 100644 http/request.go delete mode 100644 http/response.go delete mode 100644 http/rpc.go delete mode 100644 http/server.go delete mode 100644 http/service.go delete mode 100644 http/static.go delete mode 100644 http/uploads.go create mode 100644 rpc/config.go create mode 100644 rpc/service.go delete mode 100644 service/bus.go delete mode 100644 service/config.go delete mode 100644 service/factory.go create mode 100644 service/registry.go delete mode 100644 service/rpc.go delete mode 100644 service/service.go delete mode 100644 utils/size.go delete mode 100644 utils/workers.go diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go new file mode 100644 index 00000000..813a6c3b --- /dev/null +++ b/cmd/_____/bus.go @@ -0,0 +1,94 @@ +package _____ + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net/rpc" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +var ( + dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") +) + +type Bus struct { + services []Service + wg sync.WaitGroup + enabled []Service + stop chan interface{} + rpc *rpc.Server + rpcConfig *RPCConfig +} + +func (b *Bus) Register(s Service) { + b.services = append(b.services, s) +} + +func (b *Bus) Services() []Service { + return b.services +} + +func (b *Bus) Configure(cfg Config) error { + b.enabled = make([]Service, 0) + + for _, s := range b.services { + segment := cfg.Get(s.Name()) + if segment == nil { + // no config has been provided for the Service + logrus.Debugf("%s: no config has been provided", s.Name()) + continue + } + + if enable, err := s.Configure(segment); err != nil { + return err + } else if !enable { + continue + } + + b.enabled = append(b.enabled, s) + } + + return nil +} + +func (b *Bus) Serve() { + b.rpc = rpc.NewServer() + + for _, s := range b.enabled { + // some candidates might provide net/rpc api for internal communications + if api := s.RPC(); api != nil { + b.rpc.RegisterName(s.Name(), api) + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + if err := s.Serve(); err != nil { + logrus.Errorf("%s.start: %s", s.Name(), err) + } + }() + } + + b.wg.Wait() +} + +func (b *Bus) Stop() { + for _, s := range b.enabled { + if err := s.Stop(); err != nil { + logrus.Errorf("%s.stop: %s", s.Name(), err) + } + } + + b.wg.Wait() +} diff --git a/cmd/_____/factory.go b/cmd/_____/factory.go new file mode 100644 index 00000000..8ecf90ca --- /dev/null +++ b/cmd/_____/factory.go @@ -0,0 +1,71 @@ +package _____ + +import ( + "github.com/spiral/roadrunner" + "net" + "os/exec" + "strings" + "time" +) + +// todo: move out +type PoolConfig struct { + Command string + Relay string + + Number uint64 + MaxJobs uint64 + + Timeouts struct { + Construct int + Allocate int + Destroy int + } +} + +func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { + relays, terminator, err := f.relayFactory() + if err != nil { + terminator() + return nil, nil, err + } + + rr := roadrunner.NewServer(f.cmd(), relays) + if err := rr.Configure(f.rrConfig()); err != nil { + return nil, nil, err + } + + return rr, nil, nil +} + +func (f *PoolConfig) rrConfig() roadrunner.Config { + return roadrunner.Config{ + NumWorkers: f.Number, + MaxExecutions: f.MaxJobs, + AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate), + DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy), + } +} + +func (f *PoolConfig) cmd() func() *exec.Cmd { + cmd := strings.Split(f.Command, " ") + return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } +} + +func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { + if f.Relay == "pipes" || f.Relay == "pipe" { + return roadrunner.NewPipeFactory(), nil, nil + } + + dsn := strings.Split(f.Relay, "://") + if len(dsn) != 2 { + return nil, nil, dsnError + } + + ln, err := net.Listen(dsn[0], dsn[1]) + if err != nil { + return nil, nil, err + } + + return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil +} diff --git a/cmd/_____/http/config.go b/cmd/_____/http/config.go new file mode 100644 index 00000000..54e39a7d --- /dev/null +++ b/cmd/_____/http/config.go @@ -0,0 +1,85 @@ +package http + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/cmd/_____/utils" + "os" + "path" + "strings" +) + +// Configures RoadRunner HTTP server. +type Config struct { + // serve enables static file serving from desired root directory. + ServeStatic bool + + // Root directory, required when serve set to true. + Root string + + // TmpDir contains name of temporary directory to store uploaded files passed to underlying PHP process. + TmpDir string + + // MaxRequest specified max size for payload body in bytes, set 0 to unlimited. + MaxRequest int64 + + // ForbidUploads specifies list of file extensions which are forbidden for uploads. + // Example: .php, .exe, .bat, .htaccess and etc. + ForbidUploads []string +} + +// ForbidUploads must return true if file extension is not allowed for the upload. +func (cfg Config) Forbidden(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.ForbidUploads { + if ext == v { + return true + } + } + + return false +} + +type serviceConfig struct { + Enabled bool + Host string + Port string + MaxRequest string + Static struct { + Serve bool + Root string + } + + Uploads struct { + TmpDir string + Forbid []string + } + + Pool service.PoolConfig + + //todo: verbose ? +} + +func (cfg *serviceConfig) httpAddr() string { + return fmt.Sprintf("%s:%v", cfg.Host, cfg.Port) +} + +func (cfg *serviceConfig) httpConfig() *Config { + tmpDir := cfg.Uploads.TmpDir + if tmpDir == "" { + tmpDir = os.TempDir() + } + + return &Config{ + ServeStatic: cfg.Static.Serve, + Root: cfg.Static.Root, + TmpDir: tmpDir, + MaxRequest: utils.ParseSize(cfg.MaxRequest), + ForbidUploads: cfg.Uploads.Forbid, + } +} + +func (cfg *serviceConfig) Valid() error { + return nil +} diff --git a/cmd/_____/http/data.go b/cmd/_____/http/data.go new file mode 100644 index 00000000..e6b8344f --- /dev/null +++ b/cmd/_____/http/data.go @@ -0,0 +1,67 @@ +package http + +import ( + "net/http" + "strings" +) + +const maxLevel = 127 + +type dataTree map[string]interface{} + +// parsePost parses incoming request body into data tree. +func parsePost(r *http.Request) (dataTree, error) { + data := make(dataTree) + + for k, v := range r.PostForm { + data.push(k, v) + } + + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + + return data, nil +} + +func (d dataTree) push(k string, v []string) { + if len(v) == 0 { + // skip empty values + return + } + + indexes := make([]string, 0) + for _, index := range strings.Split(k, "[") { + indexes = append(indexes, strings.Trim(index, "]")) + } + + if len(indexes) <= maxLevel { + d.mount(indexes, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(v) == 0 { + return + } + + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} diff --git a/cmd/_____/http/request.go b/cmd/_____/http/request.go new file mode 100644 index 00000000..fd483744 --- /dev/null +++ b/cmd/_____/http/request.go @@ -0,0 +1,137 @@ +package http + +import ( + "encoding/json" + "fmt" + "github.com/spiral/roadrunner" + "io/ioutil" + "net/http" + "strings" +) + +const ( + defaultMaxMemory = 32 << 20 // 32 MB +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // Uri contains full request Uri with scheme and query. + Uri string `json:"uri"` + + // Headers contains list of request headers. + Headers http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // request body can be parsedData or []byte + body interface{} +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request) (req *Request, err error) { + req = &Request{ + Protocol: r.Proto, + Method: r.Method, + Uri: uri(r), + Headers: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + } + + for _, c := range r.Cookies() { + req.Cookies[c.Name] = c.Value + } + + if !req.parsable() { + req.body, err = ioutil.ReadAll(r.Body) + return req, err + } + + if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + if req.body, err = parsePost(r); err != nil { + return nil, err + } + + if req.Uploads, err = parseUploads(r); err != nil { + return nil, err + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(cfg *Config) error { + if r.Uploads == nil { + return nil + } + + return r.Uploads.Open(cfg) +} + +// Close clears all temp file uploads +func (r *Request) Close() { + if r.Uploads == nil { + return + } + + r.Uploads.Clear() +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (p *roadrunner.Payload, err error) { + p = &roadrunner.Payload{} + + if p.Context, err = json.Marshal(r); err != nil { + return nil, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return nil, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// parsable returns true if request payload can be parsed (POST dataTree, file tree). +func (r *Request) parsable() bool { + if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" { + return false + } + + ct := r.Headers.Get("content-type") + return strings.Contains(ct, "multipart/form-data") || ct == "application/x-www-form-urlencoded" +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/cmd/_____/http/response.go b/cmd/_____/http/response.go new file mode 100644 index 00000000..2736c4ab --- /dev/null +++ b/cmd/_____/http/response.go @@ -0,0 +1,42 @@ +package http + +import ( + "encoding/json" + "github.com/spiral/roadrunner" + "net/http" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Headers contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated body payload. + body []byte +} + +// NewResponse creates new response based on given roadrunner payload. +func NewResponse(p *roadrunner.Payload) (*Response, error) { + r := &Response{body: p.Body} + if err := json.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) { + for k, v := range r.Headers { + for _, h := range v { + w.Header().Add(k, h) + + } + } + + w.WriteHeader(r.Status) + w.Write(r.body) +} diff --git a/cmd/_____/http/rpc.go b/cmd/_____/http/rpc.go new file mode 100644 index 00000000..1bc8a06b --- /dev/null +++ b/cmd/_____/http/rpc.go @@ -0,0 +1,44 @@ +package http + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/cmd/_____/utils" + "github.com/pkg/errors" +) + +type rpcServer struct { + service *Service +} + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []utils.Worker `json:"workers"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.service.srv == nil { + return errors.New("no http server") + } + + logrus.Info("http: restarting worker pool") + *r = "OK" + + err := rpc.service.srv.rr.Reset() + if err != nil { + logrus.Errorf("http: %s", err) + } + + return err +} + +// Workers returns list of active workers and their stats. +func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { + if rpc.service.srv == nil { + return errors.New("no http server") + } + + r.Workers = utils.FetchWorkers(rpc.service.srv.rr) + return nil +} diff --git a/cmd/_____/http/server.go b/cmd/_____/http/server.go new file mode 100644 index 00000000..db1f22ef --- /dev/null +++ b/cmd/_____/http/server.go @@ -0,0 +1,87 @@ +package http + +import ( + "errors" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" + "net/http" + "strconv" +) + +// service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Server struct { + cfg *Config + static *staticServer + rr *roadrunner.Server +} + +// NewServer returns new instance of HTTP PSR7 server. +func NewServer(cfg *Config, server *roadrunner.Server) *Server { + h := &Server{cfg: cfg, rr: server} + + if cfg.ServeStatic { + h.static = &staticServer{root: http.Dir(h.cfg.Root)} + } + + return h +} + +// ServeHTTP serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if srv.cfg.ServeStatic && srv.static.serve(w, r) { + return + } + + // validating request size + if srv.cfg.MaxRequest != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + srv.sendError(w, r, err) + return + } else if size > srv.cfg.MaxRequest { + srv.sendError(w, r, errors.New("request body max size is exceeded")) + return + } + } + } + + req, err := NewRequest(r) + if err != nil { + srv.sendError(w, r, err) + return + } + + if err = req.Open(srv.cfg); err != nil { + srv.sendError(w, r, err) + return + } + defer req.Close() + + p, err := req.Payload() + if err != nil { + srv.sendError(w, r, err) + return + } + + rsp, err := srv.rr.Exec(p) + if err != nil { + srv.sendError(w, r, err) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + srv.sendError(w, r, err) + return + } + + resp.Write(w) +} + +// sendError sends error +func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) { + logrus.Errorf("http: %s", err) + w.WriteHeader(500) + w.Write([]byte(err.Error())) +} diff --git a/cmd/_____/http/service.go b/cmd/_____/http/service.go new file mode 100644 index 00000000..008aeab8 --- /dev/null +++ b/cmd/_____/http/service.go @@ -0,0 +1,80 @@ +package http + +import ( + "context" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "net/http" + "github.com/spiral/roadrunner" +) + +const ServiceName = "http" + +type Service struct { + cfg *serviceConfig + http *http.Server + srv *Server +} + +func (s *Service) Name() string { + return ServiceName +} + +func (s *Service) Configure(cfg service.Config) (bool, error) { + config := &serviceConfig{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enabled { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + return true, nil +} + +func (s *Service) RPC() interface{} { + return &rpcServer{s} +} + +func (s *Service) Serve() error { + logrus.Debugf("http: started") + defer logrus.Debugf("http: stopped") + + rr, term, err := s.cfg.Pool.NewServer() + if err != nil { + return err + } + defer term() + + //todo: remove + rr.Observe(func(event int, ctx interface{}) { + switch event { + case roadrunner.EventPoolError: + logrus.Error(ctx) + case roadrunner.EventWorkerError: + logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) + } + }) + + s.srv = NewServer(s.cfg.httpConfig(), rr) + s.http = &http.Server{ + Addr: s.cfg.httpAddr(), + Handler: s.srv, + } + + if err := s.http.ListenAndServe(); err != nil { + return err + } + + return nil +} + +func (s *Service) Stop() error { + return s.http.Shutdown(context.Background()) +} diff --git a/cmd/_____/http/static.go b/cmd/_____/http/static.go new file mode 100644 index 00000000..d7030c3f --- /dev/null +++ b/cmd/_____/http/static.go @@ -0,0 +1,70 @@ +package http + +import ( + "github.com/sirupsen/logrus" + "net/http" + "os" + "path" + "path/filepath" + "strings" +) + +var ( + forbiddenFiles = []string{".php", ".htaccess"} +) + +// staticServer serves static files +type staticServer struct { + root http.Dir +} + +// serve attempts to serve static file and returns true in case of success, will return false in case if file not +// found, not allowed or on read error. +func (svr *staticServer) serve(w http.ResponseWriter, r *http.Request) bool { + fpath := r.URL.Path + if !strings.HasPrefix(fpath, "/") { + fpath = "/" + fpath + } + fpath = path.Clean(fpath) + + if svr.forbidden(fpath) { + logrus.Warningf("attempt to access forbidden file %s", fpath) // todo: better logs + return false + } + + f, err := svr.root.Open(fpath) + if err != nil { + if !os.IsNotExist(err) { + logrus.Error(err) //todo: rr or access error + } + + return false + } + defer f.Close() + + d, err := f.Stat() + if err != nil { + logrus.Error(err) //todo: rr or access error + return false + } + + if d.IsDir() { + // do not serve directories + return false + } + + http.ServeContent(w, r, d.Name(), d.ModTime(), f) + return true +} + +// forbidden returns true if file has forbidden extension. +func (svr *staticServer) forbidden(path string) bool { + ext := strings.ToLower(filepath.Ext(path)) + for _, exl := range forbiddenFiles { + if ext == exl { + return true + } + } + + return false +} diff --git a/cmd/_____/http/uploads.go b/cmd/_____/http/uploads.go new file mode 100644 index 00000000..468e8a19 --- /dev/null +++ b/cmd/_____/http/uploads.go @@ -0,0 +1,207 @@ +package http + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "os" + "strings" + "sync" +) + +const ( + // There is no error, the file uploaded with success. + UploadErrorOK = 0 + + // No file was uploaded. + UploadErrorNoFile = 4 + + // Missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // Failed to write file to disk. + UploadErrorCantWrite = 6 + + // ForbidUploads file extension. + UploadErrorExtension = 7 +) + +// FileUpload represents singular file wrapUpload. +type FileUpload struct { + // Name contains filename specified by the client. + Name string `json:"name"` + + // MimeType contains mime-type provided by the client. + MimeType string `json:"type"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +func (f *FileUpload) Open(cfg *Config) error { + if cfg.Forbidden(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + defer file.Close() + + tmp, err := ioutil.TempFile(cfg.TmpDir, "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer tmp.Close() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +func wrapUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + MimeType: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +type fileTree map[string]interface{} + +func (d fileTree) push(k string, v []*FileUpload) { + if len(v) == 0 { + // skip empty values + return + } + + indexes := make([]string, 0) + for _, index := range strings.Split(k, "[") { + indexes = append(indexes, strings.Trim(index, "]")) + } + + if len(indexes) <= maxLevel { + d.mount(indexes, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(v) == 0 { + return + } + + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// tree manages uploaded files tree and temporary files. +type Uploads struct { + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + return json.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. @todo: do we need it? +func (u *Uploads) Open(cfg *Config) error { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + f.Open(cfg) + }(f) + } + + wg.Wait() + return nil +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear() { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + os.Remove(f.TempFilename) + } + } +} + +// parse incoming dataTree request into JSON (including multipart form dataTree) +func parseUploads(r *http.Request) (*Uploads, error) { + u := &Uploads{ + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, wrapUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u, nil +} + +// exists if file exists. by osutils; todo: better? +func exists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + + if os.IsNotExist(err) { + return false + } + + panic(fmt.Errorf("unable to stat path %q; %v", path, err)) +} diff --git a/cmd/_____/utils/size.go b/cmd/_____/utils/size.go new file mode 100644 index 00000000..176cc9e1 --- /dev/null +++ b/cmd/_____/utils/size.go @@ -0,0 +1,28 @@ +package utils + +import ( + "strconv" + "strings" +) + +func ParseSize(size string) int64 { + if len(size) == 0 { + return 0 + } + + s, err := strconv.Atoi(size[:len(size)-1]) + if err != nil { + return 0 + } + + switch strings.ToLower(size[len(size)-1:]) { + case "k", "kb": + return int64(s * 1024) + case "m", "mb": + return int64(s * 1024 * 1024) + case "g", "gb": + return int64(s * 1024 * 1024 * 1024) + } + + return 0 +} diff --git a/cmd/_____/utils/workers.go b/cmd/_____/utils/workers.go new file mode 100644 index 00000000..1024b4c6 --- /dev/null +++ b/cmd/_____/utils/workers.go @@ -0,0 +1,37 @@ +package utils + +import "github.com/spiral/roadrunner" + +// Worker provides information about specific worker. +type Worker struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumExecs uint64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` + + // Updated is unix nano timestamp of last worker execution. + Updated int64 `json:"updated"` +} + +// FetchWorkers fetches list of workers from RR Server. +func FetchWorkers(srv *roadrunner.Server) (result []Worker) { + for _, w := range srv.Workers() { + state := w.State() + result = append(result, Worker{ + Pid: *w.Pid, + Status: state.String(), + NumExecs: state.NumExecs(), + Created: w.Created.UnixNano(), + Updated: state.Updated().UnixNano(), + }) + } + + return +} \ No newline at end of file diff --git a/cmd/_____/verbose.go b/cmd/_____/verbose.go new file mode 100644 index 00000000..d0088b69 --- /dev/null +++ b/cmd/_____/verbose.go @@ -0,0 +1,18 @@ +package _____ + +//if f.Verbose { +// rr.Observe(func(event int, ctx interface{}) { +// switch event { +// case roadrunner.EventPoolError: +// logrus.Error(ctx) +// case roadrunner.EventWorkerCreate: +// logrus.Infof("%s - created", ctx) +// case roadrunner.EventWorkerError: +// logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) +// case roadrunner.EventWorkerDestruct: +// logrus.Warnf("%s - destructed", ctx) +// case roadrunner.EventWorkerKill: +// logrus.Warnf("%s - killed", ctx) +// } +// }) +//} diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index 2717e187..6e3b689b 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -1,5 +1,8 @@ # rpc bus allows php application and external clients to talk to rr services. rpc: + # enable rpc server + enable: true + # rpc connection DSN. Supported TCP and Unix sockets. listen: tcp://127.0.0.1:6001 diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index d0cac5ef..d54437f0 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -31,24 +31,29 @@ import ( // Service bus for all the commands. var ( - // Shared service bus. - Services = service.NewBus() + cfgFile string + verbose bool + + // Logger - shared logger. + Logger = logrus.New() + + // Services - shared service bus. + Services = service.NewRegistry(Logger) // CLI is application endpoint. CLI = &cobra.Command{ - Use: "rr", - Short: "RoadRunner, PHP application server", + Use: "rr", + SilenceErrors: true, + SilenceUsage: true, + Short: utils.Sprintf("RoadRunner, PHP Application Server."), } - - cfgFile string - verbose bool ) // Execute adds all child commands to the CLI command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the CLI. func Execute() { if err := CLI.Execute(); err != nil { - logrus.Error(err) + utils.Printf("Error: %s\n", err) os.Exit(1) } } @@ -59,7 +64,7 @@ func init() { cobra.OnInitialize(func() { if verbose { - logrus.SetLevel(logrus.DebugLevel) + Logger.SetLevel(logrus.DebugLevel) } if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { @@ -81,6 +86,7 @@ func initConfig(cfgFile string, path []string, name string) service.Config { for _, p := range path { cfg.AddConfigPath(p) } + cfg.SetConfigName(name) } @@ -89,9 +95,9 @@ func initConfig(cfgFile string, path []string, name string) service.Config { // If a cfg file is found, read it in. if err := cfg.ReadInConfig(); err != nil { - logrus.Warnf("config: %s", err) + Logger.Warnf("config: %s", err) return nil } - return &utils.ConfigWrapper{cfg} + return &utils.ViperWrapper{Viper: cfg} } diff --git a/cmd/rr/http/register.go b/cmd/rr/http/register.go deleted file mode 100644 index fb828578..00000000 --- a/cmd/rr/http/register.go +++ /dev/null @@ -1,23 +0,0 @@ -package http - -import ( - "github.com/spf13/cobra" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/http" -) - -func init() { - rr.Services.Register(&http.Service{}) - - rr.CLI.AddCommand(&cobra.Command{ - Use: "http:reload", - Short: "Reload RoadRunner worker pools for the HTTP service", - Run: reloadHandler, - }) - - rr.CLI.AddCommand(&cobra.Command{ - Use: "http:workers", - Short: "List workers associated with RoadRunner HTTP service", - Run: workersHandler, - }) -} diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go index 6cdba576..0fd3d7e9 100644 --- a/cmd/rr/http/reload.go +++ b/cmd/rr/http/reload.go @@ -23,22 +23,34 @@ package http import ( "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/sirupsen/logrus" + "github.com/go-errors/errors" + "github.com/spiral/roadrunner/rpc" ) -func reloadHandler(cmd *cobra.Command, args []string) { - client, err := rr.Services.RCPClient() +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:reload", + Short: "Reload RoadRunner worker pools for the HTTP service", + RunE: reloadHandler, + }) +} + +func reloadHandler(cmd *cobra.Command, args []string) error { + if !rr.Services.Has("rpc") { + return errors.New("RPC service is not configured") + } + + client, err := rr.Services.Get("rpc").(*rpc.Service).Client() if err != nil { - logrus.Error(err) - return + return err } defer client.Close() var r string if err := client.Call("http.Reset", true, &r); err != nil { - logrus.Error(err) - return + return err } - logrus.Info("restarting http worker pool") + rr.Logger.Info("http.service: restarting worker pool") + return nil } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index 13e8d21c..63ef0cce 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -21,38 +21,47 @@ package http import ( - "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/http" - "os" - "strconv" - "github.com/sirupsen/logrus" + "errors" + "github.com/spiral/roadrunner/rpc" ) -func workersHandler(cmd *cobra.Command, args []string) { - client, err := rr.Services.RCPClient() - if err != nil { - logrus.Error(err) - return - } - defer client.Close() +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:workers", + Short: "List workers associated with RoadRunner HTTP service", + RunE: workersHandler, + }) +} - var r http.WorkerList - if err := client.Call("http.Workers", true, &r); err != nil { - panic(err) +func workersHandler(cmd *cobra.Command, args []string) error { + if !rr.Services.Has("rpc") { + return errors.New("RPC service is not configured") } - tw := tablewriter.NewWriter(os.Stdout) - tw.SetHeader([]string{"PID", "Status", "Num Execs"}) - - for _, w := range r.Workers { - tw.Append([]string{ - strconv.Itoa(w.Pid), - w.Status, - strconv.Itoa(int(w.NumExecs)), - }) + client, err := rr.Services.Get("rpc").(*rpc.Service).Client() + if err != nil { + return err } + defer client.Close() - tw.Render() + //var r http.WorkerList + //if err := client.Call("http.Workers", true, &r); err != nil { + // panic(err) + //} + // + //tw := tablewriter.NewWriter(os.Stdout) + //tw.SetHeader([]string{"PID", "Status", "Num Execs"}) + // + //for _, w := range r.Workers { + // tw.Append([]string{ + // strconv.Itoa(w.Pid), + // w.Status, + // strconv.Itoa(int(w.NumExecs)), + // }) + //} + // + //tw.Render() + return nil } diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 9d6f685c..26f70fdd 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -23,13 +23,17 @@ package main import ( - "github.com/spiral/roadrunner/cmd/rr/cmd" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/rpc" - // service plugins + // cli plugins _ "github.com/spiral/roadrunner/cmd/rr/http" ) func main() { + // provides ability to make local connection to services + rr.Services.Register("rpc", new(rpc.Service)) + // you can register additional commands using cmd.CLI - cmd.Execute() + rr.Execute() } diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go index e7e22b3a..452dd195 100644 --- a/cmd/rr/utils/config.go +++ b/cmd/rr/utils/config.go @@ -5,19 +5,22 @@ import ( "github.com/spiral/roadrunner/service" ) -type ConfigWrapper struct { +// ViperWrapper provides interface bridge between Viper configs and service.Config. +type ViperWrapper struct { Viper *viper.Viper } -func (w *ConfigWrapper) Get(key string) service.Config { +// Get nested config section (sub-map), returns nil if section not found. +func (w *ViperWrapper) Get(key string) service.Config { sub := w.Viper.Sub(key) if sub == nil { return nil } - return &ConfigWrapper{sub} + return &ViperWrapper{sub} } -func (w *ConfigWrapper) Unmarshal(out interface{}) error { +// Unmarshal unmarshal config data into given struct. +func (w *ViperWrapper) Unmarshal(out interface{}) error { return w.Viper.Unmarshal(out) } diff --git a/cmd/rr/utils/verbose.go b/cmd/rr/utils/verbose.go deleted file mode 100644 index 43770f34..00000000 --- a/cmd/rr/utils/verbose.go +++ /dev/null @@ -1,18 +0,0 @@ -package utils - -//if f.Verbose { -// rr.Observe(func(event int, ctx interface{}) { -// switch event { -// case roadrunner.EventPoolError: -// logrus.Error(ctx) -// case roadrunner.EventWorkerCreate: -// logrus.Infof("%s - created", ctx) -// case roadrunner.EventWorkerError: -// logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) -// case roadrunner.EventWorkerDestruct: -// logrus.Warnf("%s - destructed", ctx) -// case roadrunner.EventWorkerKill: -// logrus.Warnf("%s - killed", ctx) -// } -// }) -//} diff --git a/http/config.go b/http/config.go deleted file mode 100644 index 2a64dbab..00000000 --- a/http/config.go +++ /dev/null @@ -1,85 +0,0 @@ -package http - -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/utils" - "os" - "path" - "strings" -) - -// Configures RoadRunner HTTP server. -type Config struct { - // serve enables static file serving from desired root directory. - ServeStatic bool - - // Root directory, required when serve set to true. - Root string - - // TmpDir contains name of temporary directory to store uploaded files passed to underlying PHP process. - TmpDir string - - // MaxRequest specified max size for payload body in bytes, set 0 to unlimited. - MaxRequest int64 - - // ForbidUploads specifies list of file extensions which are forbidden for uploads. - // Example: .php, .exe, .bat, .htaccess and etc. - ForbidUploads []string -} - -// ForbidUploads must return true if file extension is not allowed for the upload. -func (cfg Config) Forbidden(filename string) bool { - ext := strings.ToLower(path.Ext(filename)) - - for _, v := range cfg.ForbidUploads { - if ext == v { - return true - } - } - - return false -} - -type serviceConfig struct { - Enabled bool - Host string - Port string - MaxRequest string - Static struct { - Serve bool - Root string - } - - Uploads struct { - TmpDir string - Forbid []string - } - - Pool service.PoolConfig - - //todo: verbose ? -} - -func (cfg *serviceConfig) httpAddr() string { - return fmt.Sprintf("%s:%v", cfg.Host, cfg.Port) -} - -func (cfg *serviceConfig) httpConfig() *Config { - tmpDir := cfg.Uploads.TmpDir - if tmpDir == "" { - tmpDir = os.TempDir() - } - - return &Config{ - ServeStatic: cfg.Static.Serve, - Root: cfg.Static.Root, - TmpDir: tmpDir, - MaxRequest: utils.ParseSize(cfg.MaxRequest), - ForbidUploads: cfg.Uploads.Forbid, - } -} - -func (cfg *serviceConfig) Valid() error { - return nil -} diff --git a/http/data.go b/http/data.go deleted file mode 100644 index e6b8344f..00000000 --- a/http/data.go +++ /dev/null @@ -1,67 +0,0 @@ -package http - -import ( - "net/http" - "strings" -) - -const maxLevel = 127 - -type dataTree map[string]interface{} - -// parsePost parses incoming request body into data tree. -func parsePost(r *http.Request) (dataTree, error) { - data := make(dataTree) - - for k, v := range r.PostForm { - data.push(k, v) - } - - for k, v := range r.MultipartForm.Value { - data.push(k, v) - } - - return data, nil -} - -func (d dataTree) push(k string, v []string) { - if len(v) == 0 { - // skip empty values - return - } - - indexes := make([]string, 0) - for _, index := range strings.Split(k, "[") { - indexes = append(indexes, strings.Trim(index, "]")) - } - - if len(indexes) <= maxLevel { - d.mount(indexes, v) - } -} - -// mount mounts data tree recursively. -func (d dataTree) mount(i []string, v []string) { - if len(v) == 0 { - return - } - - if len(i) == 1 { - // single value context - d[i[0]] = v[0] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(dataTree).mount(i[1:], v) - } - - d[i[0]] = make(dataTree) - d[i[0]].(dataTree).mount(i[1:], v) -} diff --git a/http/request.go b/http/request.go deleted file mode 100644 index fd483744..00000000 --- a/http/request.go +++ /dev/null @@ -1,137 +0,0 @@ -package http - -import ( - "encoding/json" - "fmt" - "github.com/spiral/roadrunner" - "io/ioutil" - "net/http" - "strings" -) - -const ( - defaultMaxMemory = 32 << 20 // 32 MB -) - -// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. -type Request struct { - // Protocol includes HTTP protocol version. - Protocol string `json:"protocol"` - - // Method contains name of HTTP method used for the request. - Method string `json:"method"` - - // Uri contains full request Uri with scheme and query. - Uri string `json:"uri"` - - // Headers contains list of request headers. - Headers http.Header `json:"headers"` - - // Cookies contains list of request cookies. - Cookies map[string]string `json:"cookies"` - - // RawQuery contains non parsed query string (to be parsed on php end). - RawQuery string `json:"rawQuery"` - - // Parsed indicates that request body has been parsed on RR end. - Parsed bool `json:"parsed"` - - // Uploads contains list of uploaded files, their names, sized and associations with temporary files. - Uploads *Uploads `json:"uploads"` - - // request body can be parsedData or []byte - body interface{} -} - -// NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request) (req *Request, err error) { - req = &Request{ - Protocol: r.Proto, - Method: r.Method, - Uri: uri(r), - Headers: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - } - - for _, c := range r.Cookies() { - req.Cookies[c.Name] = c.Value - } - - if !req.parsable() { - req.body, err = ioutil.ReadAll(r.Body) - return req, err - } - - if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { - return nil, err - } - - if req.body, err = parsePost(r); err != nil { - return nil, err - } - - if req.Uploads, err = parseUploads(r); err != nil { - return nil, err - } - - req.Parsed = true - return req, nil -} - -// Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open(cfg *Config) error { - if r.Uploads == nil { - return nil - } - - return r.Uploads.Open(cfg) -} - -// Close clears all temp file uploads -func (r *Request) Close() { - if r.Uploads == nil { - return - } - - r.Uploads.Clear() -} - -// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open -// files prior to calling this method. -func (r *Request) Payload() (p *roadrunner.Payload, err error) { - p = &roadrunner.Payload{} - - if p.Context, err = json.Marshal(r); err != nil { - return nil, err - } - - if r.Parsed { - if p.Body, err = json.Marshal(r.body); err != nil { - return nil, err - } - } else if r.body != nil { - p.Body = r.body.([]byte) - } - - return p, nil -} - -// parsable returns true if request payload can be parsed (POST dataTree, file tree). -func (r *Request) parsable() bool { - if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" { - return false - } - - ct := r.Headers.Get("content-type") - return strings.Contains(ct, "multipart/form-data") || ct == "application/x-www-form-urlencoded" -} - -// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func uri(r *http.Request) string { - if r.TLS != nil { - return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) - } - - return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} diff --git a/http/response.go b/http/response.go deleted file mode 100644 index 2736c4ab..00000000 --- a/http/response.go +++ /dev/null @@ -1,42 +0,0 @@ -package http - -import ( - "encoding/json" - "github.com/spiral/roadrunner" - "net/http" -) - -// Response handles PSR7 response logic. -type Response struct { - // Status contains response status. - Status int `json:"status"` - - // Headers contains list of response headers. - Headers map[string][]string `json:"headers"` - - // associated body payload. - body []byte -} - -// NewResponse creates new response based on given roadrunner payload. -func NewResponse(p *roadrunner.Payload) (*Response, error) { - r := &Response{body: p.Body} - if err := json.Unmarshal(p.Context, r); err != nil { - return nil, err - } - - return r, nil -} - -// Write writes response headers, status and body into ResponseWriter. -func (r *Response) Write(w http.ResponseWriter) { - for k, v := range r.Headers { - for _, h := range v { - w.Header().Add(k, h) - - } - } - - w.WriteHeader(r.Status) - w.Write(r.body) -} diff --git a/http/rpc.go b/http/rpc.go deleted file mode 100644 index 38db9a61..00000000 --- a/http/rpc.go +++ /dev/null @@ -1,44 +0,0 @@ -package http - -import ( - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/utils" - "github.com/pkg/errors" -) - -type rpcServer struct { - service *Service -} - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []utils.Worker `json:"workers"` -} - -// Reset resets underlying RR worker pool and restarts all of it's workers. -func (rpc *rpcServer) Reset(reset bool, r *string) error { - if rpc.service.srv == nil { - return errors.New("no http server") - } - - logrus.Info("http: restarting worker pool") - *r = "OK" - - err := rpc.service.srv.rr.Reset() - if err != nil { - logrus.Errorf("http: %s", err) - } - - return err -} - -// Workers returns list of active workers and their stats. -func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { - if rpc.service.srv == nil { - return errors.New("no http server") - } - - r.Workers = utils.FetchWorkers(rpc.service.srv.rr) - return nil -} diff --git a/http/server.go b/http/server.go deleted file mode 100644 index db1f22ef..00000000 --- a/http/server.go +++ /dev/null @@ -1,87 +0,0 @@ -package http - -import ( - "errors" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner" - "net/http" - "strconv" -) - -// service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type Server struct { - cfg *Config - static *staticServer - rr *roadrunner.Server -} - -// NewServer returns new instance of HTTP PSR7 server. -func NewServer(cfg *Config, server *roadrunner.Server) *Server { - h := &Server{cfg: cfg, rr: server} - - if cfg.ServeStatic { - h.static = &staticServer{root: http.Dir(h.cfg.Root)} - } - - return h -} - -// ServeHTTP serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if srv.cfg.ServeStatic && srv.static.serve(w, r) { - return - } - - // validating request size - if srv.cfg.MaxRequest != 0 { - if length := r.Header.Get("content-length"); length != "" { - if size, err := strconv.ParseInt(length, 10, 64); err != nil { - srv.sendError(w, r, err) - return - } else if size > srv.cfg.MaxRequest { - srv.sendError(w, r, errors.New("request body max size is exceeded")) - return - } - } - } - - req, err := NewRequest(r) - if err != nil { - srv.sendError(w, r, err) - return - } - - if err = req.Open(srv.cfg); err != nil { - srv.sendError(w, r, err) - return - } - defer req.Close() - - p, err := req.Payload() - if err != nil { - srv.sendError(w, r, err) - return - } - - rsp, err := srv.rr.Exec(p) - if err != nil { - srv.sendError(w, r, err) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - srv.sendError(w, r, err) - return - } - - resp.Write(w) -} - -// sendError sends error -func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) { - logrus.Errorf("http: %s", err) - w.WriteHeader(500) - w.Write([]byte(err.Error())) -} diff --git a/http/service.go b/http/service.go deleted file mode 100644 index 5d45240b..00000000 --- a/http/service.go +++ /dev/null @@ -1,78 +0,0 @@ -package http - -import ( - "context" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/service" - "net/http" - "github.com/spiral/roadrunner" -) - -type Service struct { - cfg *serviceConfig - http *http.Server - srv *Server -} - -func (s *Service) Name() string { - return "http" -} - -func (s *Service) Configure(cfg service.Config) (bool, error) { - config := &serviceConfig{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enabled { - return false, nil - } - - if err := config.Valid(); err != nil { - return false, err - } - - s.cfg = config - return true, nil -} - -func (s *Service) RPC() interface{} { - return &rpcServer{s} -} - -func (s *Service) Serve() error { - logrus.Debugf("http: started") - defer logrus.Debugf("http: stopped") - - rr, term, err := s.cfg.Pool.NewServer() - if err != nil { - return err - } - defer term() - - //todo: remove - rr.Observe(func(event int, ctx interface{}) { - switch event { - case roadrunner.EventPoolError: - logrus.Error(ctx) - case roadrunner.EventWorkerError: - logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) - } - }) - - s.srv = NewServer(s.cfg.httpConfig(), rr) - s.http = &http.Server{ - Addr: s.cfg.httpAddr(), - Handler: s.srv, - } - - if err := s.http.ListenAndServe(); err != nil { - return err - } - - return nil -} - -func (s *Service) Stop() error { - return s.http.Shutdown(context.Background()) -} diff --git a/http/static.go b/http/static.go deleted file mode 100644 index d7030c3f..00000000 --- a/http/static.go +++ /dev/null @@ -1,70 +0,0 @@ -package http - -import ( - "github.com/sirupsen/logrus" - "net/http" - "os" - "path" - "path/filepath" - "strings" -) - -var ( - forbiddenFiles = []string{".php", ".htaccess"} -) - -// staticServer serves static files -type staticServer struct { - root http.Dir -} - -// serve attempts to serve static file and returns true in case of success, will return false in case if file not -// found, not allowed or on read error. -func (svr *staticServer) serve(w http.ResponseWriter, r *http.Request) bool { - fpath := r.URL.Path - if !strings.HasPrefix(fpath, "/") { - fpath = "/" + fpath - } - fpath = path.Clean(fpath) - - if svr.forbidden(fpath) { - logrus.Warningf("attempt to access forbidden file %s", fpath) // todo: better logs - return false - } - - f, err := svr.root.Open(fpath) - if err != nil { - if !os.IsNotExist(err) { - logrus.Error(err) //todo: rr or access error - } - - return false - } - defer f.Close() - - d, err := f.Stat() - if err != nil { - logrus.Error(err) //todo: rr or access error - return false - } - - if d.IsDir() { - // do not serve directories - return false - } - - http.ServeContent(w, r, d.Name(), d.ModTime(), f) - return true -} - -// forbidden returns true if file has forbidden extension. -func (svr *staticServer) forbidden(path string) bool { - ext := strings.ToLower(filepath.Ext(path)) - for _, exl := range forbiddenFiles { - if ext == exl { - return true - } - } - - return false -} diff --git a/http/uploads.go b/http/uploads.go deleted file mode 100644 index 468e8a19..00000000 --- a/http/uploads.go +++ /dev/null @@ -1,207 +0,0 @@ -package http - -import ( - "encoding/json" - "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "os" - "strings" - "sync" -) - -const ( - // There is no error, the file uploaded with success. - UploadErrorOK = 0 - - // No file was uploaded. - UploadErrorNoFile = 4 - - // Missing a temporary folder. - UploadErrorNoTmpDir = 5 - - // Failed to write file to disk. - UploadErrorCantWrite = 6 - - // ForbidUploads file extension. - UploadErrorExtension = 7 -) - -// FileUpload represents singular file wrapUpload. -type FileUpload struct { - // Name contains filename specified by the client. - Name string `json:"name"` - - // MimeType contains mime-type provided by the client. - MimeType string `json:"type"` - - // Size of the uploaded file. - Size int64 `json:"size"` - - // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php - Error int `json:"error"` - - // TempFilename points to temporary file location. - TempFilename string `json:"tmpName"` - - // associated file header - header *multipart.FileHeader -} - -func (f *FileUpload) Open(cfg *Config) error { - if cfg.Forbidden(f.Name) { - f.Error = UploadErrorExtension - return nil - } - - file, err := f.header.Open() - if err != nil { - f.Error = UploadErrorNoFile - return err - } - defer file.Close() - - tmp, err := ioutil.TempFile(cfg.TmpDir, "upload") - if err != nil { - // most likely cause of this issue is missing tmp dir - f.Error = UploadErrorNoTmpDir - return err - } - - f.TempFilename = tmp.Name() - defer tmp.Close() - - if f.Size, err = io.Copy(tmp, file); err != nil { - f.Error = UploadErrorCantWrite - } - - return err -} - -func wrapUpload(f *multipart.FileHeader) *FileUpload { - return &FileUpload{ - Name: f.Filename, - MimeType: f.Header.Get("Content-Type"), - Error: UploadErrorOK, - header: f, - } -} - -type fileTree map[string]interface{} - -func (d fileTree) push(k string, v []*FileUpload) { - if len(v) == 0 { - // skip empty values - return - } - - indexes := make([]string, 0) - for _, index := range strings.Split(k, "[") { - indexes = append(indexes, strings.Trim(index, "]")) - } - - if len(indexes) <= maxLevel { - d.mount(indexes, v) - } -} - -// mount mounts data tree recursively. -func (d fileTree) mount(i []string, v []*FileUpload) { - if len(v) == 0 { - return - } - - if len(i) == 1 { - // single value context - d[i[0]] = v[0] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(fileTree).mount(i[1:], v) - } - - d[i[0]] = make(fileTree) - d[i[0]].(fileTree).mount(i[1:], v) -} - -// tree manages uploaded files tree and temporary files. -type Uploads struct { - // pre processed data tree for Uploads. - tree fileTree - - // flat list of all file Uploads. - list []*FileUpload -} - -// MarshalJSON marshal tree tree into JSON. -func (u *Uploads) MarshalJSON() ([]byte, error) { - return json.Marshal(u.tree) -} - -// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors -// will be handled individually. @todo: do we need it? -func (u *Uploads) Open(cfg *Config) error { - var wg sync.WaitGroup - for _, f := range u.list { - wg.Add(1) - go func(f *FileUpload) { - defer wg.Done() - f.Open(cfg) - }(f) - } - - wg.Wait() - return nil -} - -// Clear deletes all temporary files. -func (u *Uploads) Clear() { - for _, f := range u.list { - if f.TempFilename != "" && exists(f.TempFilename) { - os.Remove(f.TempFilename) - } - } -} - -// parse incoming dataTree request into JSON (including multipart form dataTree) -func parseUploads(r *http.Request) (*Uploads, error) { - u := &Uploads{ - tree: make(fileTree), - list: make([]*FileUpload, 0), - } - - for k, v := range r.MultipartForm.File { - files := make([]*FileUpload, 0, len(v)) - for _, f := range v { - files = append(files, wrapUpload(f)) - } - - u.list = append(u.list, files...) - u.tree.push(k, files) - } - - return u, nil -} - -// exists if file exists. by osutils; todo: better? -func exists(path string) bool { - _, err := os.Stat(path) - if err == nil { - return true - } - - if os.IsNotExist(err) { - return false - } - - panic(fmt.Errorf("unable to stat path %q; %v", path, err)) -} diff --git a/rpc/config.go b/rpc/config.go new file mode 100644 index 00000000..67dc1094 --- /dev/null +++ b/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // Listen string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/rpc/service.go b/rpc/service.go new file mode 100644 index 00000000..61a9a1a3 --- /dev/null +++ b/rpc/service.go @@ -0,0 +1,99 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" +) + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server +} + +// WithConfig must return Service instance configured with the given environment. Must return error in case of +// misconfiguration, might return nil as Service if Service is not enabled. +func (s *Service) WithConfig(cfg service.Config, reg service.Registry) (service.Service, error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return nil, err + } + + if !config.Enable { + return nil, nil + } + + return &Service{cfg: config, rpc: rpc.NewServer()}, nil +} + +// Serve serves Service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.stop = make(chan interface{}) + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + for { + select { + case <-s.stop: + return nil + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + s.rpc.Accept(ln) + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + + return nil +} + +// Stop stop Service Service. +func (s *Service) Stop() error { + close(s.stop) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/service/bus.go b/service/bus.go deleted file mode 100644 index 8bfb914c..00000000 --- a/service/bus.go +++ /dev/null @@ -1,165 +0,0 @@ -package service - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spiral/goridge" - "net/rpc" - "sync" -) - -const ( - rpcConfig = "rpc" -) - -var ( - dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") -) - -type Bus struct { - wg sync.WaitGroup - services []Service - enabled []Service - stop chan interface{} - rpc *rpc.Server - rpcConfig *RPCConfig -} - -func (b *Bus) Register(s Service) { - b.services = append(b.services, s) -} - -func (b *Bus) Services() []Service { - return b.services -} - -func (b *Bus) Configure(cfg Config) error { - if segment := cfg.Get(rpcConfig); segment == nil { - logrus.Warn("rpc: no config has been provided") - } else { - b.rpcConfig = &RPCConfig{} - if err := segment.Unmarshal(b.rpcConfig); err != nil { - return err - } - } - - b.enabled = make([]Service, 0) - - for _, s := range b.services { - segment := cfg.Get(s.Name()) - if segment == nil { - // no config has been provided for the service - logrus.Debugf("%s: no config has been provided", s.Name()) - continue - } - - if enable, err := s.Configure(segment); err != nil { - return err - } else if !enable { - continue - } - - b.enabled = append(b.enabled, s) - } - - return nil -} - -func (b *Bus) RCPClient() (*rpc.Client, error) { - if b.rpcConfig == nil { - return nil, errors.New("rpc is not configured") - } - - conn, err := b.rpcConfig.CreateDialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} - -func (b *Bus) Serve() { - b.rpc = rpc.NewServer() - - for _, s := range b.enabled { - // some services might provide net/rpc api for internal communications - if api := s.RPC(); api != nil { - b.rpc.RegisterName(s.Name(), api) - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: %s", s.Name(), err) - } - }() - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - logrus.Debug("rpc: started") - if err := b.serveRPC(); err != nil { - logrus.Errorf("rpc: %s", err) - } - }() - - b.wg.Wait() -} - -func (b *Bus) Stop() { - if err := b.stopRPC(); err != nil { - logrus.Errorf("rpc: ", err) - } - - for _, s := range b.enabled { - if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: %s", s.Name(), err) - } - } - - b.wg.Wait() -} - -func (b *Bus) serveRPC() error { - if b.rpcConfig == nil { - return nil - } - - b.stop = make(chan interface{}) - - ln, err := b.rpcConfig.CreateListener() - if err != nil { - return err - } - defer ln.Close() - - for { - select { - case <-b.stop: - b.rpc = nil - return nil - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go b.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - - return nil -} - -func (b *Bus) stopRPC() error { - if b.rpcConfig == nil { - return nil - } - - close(b.stop) - return nil -} diff --git a/service/config.go b/service/config.go deleted file mode 100644 index d5381376..00000000 --- a/service/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package service - -type Config interface { - Get(key string) Config - Unmarshal(out interface{}) error -} diff --git a/service/factory.go b/service/factory.go deleted file mode 100644 index e4a599e6..00000000 --- a/service/factory.go +++ /dev/null @@ -1,69 +0,0 @@ -package service - -import ( - "github.com/spiral/roadrunner" - "net" - "os/exec" - "strings" - "time" -) - -type PoolConfig struct { - Command string - Relay string - - Number uint64 - MaxJobs uint64 - - Timeouts struct { - Allocate int - Destroy int - } -} - -func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { - relays, terminator, err := f.relayFactory() - if err != nil { - terminator() - return nil, nil, err - } - - rr := roadrunner.NewServer(f.cmd(), relays) - if err := rr.Configure(f.rrConfig()); err != nil { - return nil, nil, err - } - - return rr, nil, nil -} - -func (f *PoolConfig) rrConfig() roadrunner.Config { - return roadrunner.Config{ - NumWorkers: f.Number, - MaxExecutions: f.MaxJobs, - AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate), - DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy), - } -} - -func (f *PoolConfig) cmd() func() *exec.Cmd { - cmd := strings.Split(f.Command, " ") - return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } -} - -func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { - if f.Relay == "pipes" || f.Relay == "pipe" { - return roadrunner.NewPipeFactory(), nil, nil - } - - dsn := strings.Split(f.Relay, "://") - if len(dsn) != 2 { - return nil, nil, dsnError - } - - ln, err := net.Listen(dsn[0], dsn[1]) - if err != nil { - return nil, nil, err - } - - return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil -} diff --git a/service/registry.go b/service/registry.go new file mode 100644 index 00000000..d4e2ff12 --- /dev/null +++ b/service/registry.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// Registry controls all internal RR services and provides plugin based system. +type Registry interface { + // Register add new service to the registry under given name. + Register(name string, service Service) + + // Configure configures all underlying services with given configuration. + Configure(cfg Config) error + + // Check is Service has been registered and configured. + Has(service string) bool + + // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance. + Get(service string) Service + + // Serve all configured services. Non blocking. + Serve() error + + // Stop all active services. + Stop() error +} + +// Service provides high level functionality for road runner Service. +type Service interface { + // WithConfig must return Service instance configured with the given environment. Must return error in case of + // misconfiguration, might return nil as Service if Service is not enabled. + WithConfig(cfg Config, reg Registry) (Service, error) + + // Serve serves Service. + Serve() error + + // Stop stop Service Service. + Stop() error +} + +type registry struct { + log logrus.FieldLogger + mu sync.Mutex + candidates []*entry + configured []*entry +} + +// entry creates association between service instance and given name. +type entry struct { + // Associated service name + Name string + + // Associated service instance + Service Service + + // Serving indicates that service is currently serving + Serving bool +} + +// NewRegistry creates new registry. +func NewRegistry(log logrus.FieldLogger) Registry { + return ®istry{ + log: log, + candidates: make([]*entry, 0), + } +} + +// Register add new service to the registry under given name. +func (r *registry) Register(name string, service Service) { + r.mu.Lock() + defer r.mu.Unlock() + + r.candidates = append(r.candidates, &entry{ + Name: name, + Service: service, + Serving: false, + }) + + r.log.Debugf("%s.service: registered", name) +} + +// Configure configures all underlying services with given configuration. +func (r *registry) Configure(cfg Config) error { + if r.configured != nil { + return fmt.Errorf("service bus has been already configured") + } + + r.configured = make([]*entry, 0) + for _, e := range r.candidates { + segment := cfg.Get(e.Name) + if segment == nil { + r.log.Debugf("%s.service: no config has been provided", e.Name) + continue + } + + s, err := e.Service.WithConfig(segment, r) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name)) + } + + if s != nil { + r.configured = append(r.configured, &entry{ + Name: e.Name, + Service: s, + Serving: false, + }) + } + } + + return nil +} + +// Check is Service has been registered. +func (r *registry) Has(service string) bool { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return true + } + } + + return false +} + +// Get returns Service instance by it's Name or nil if Service not found. +func (r *registry) Get(service string) Service { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return e.Service + } + } + + return nil +} + +// Serve all configured services. Non blocking. +func (r *registry) Serve() error { + return nil +} + +// Stop all active services. +func (r *registry) Stop() error { + return nil +} diff --git a/service/rpc.go b/service/rpc.go deleted file mode 100644 index eb128768..00000000 --- a/service/rpc.go +++ /dev/null @@ -1,32 +0,0 @@ -package service - -import ( - "net" - "strings" -) - -type RPCConfig struct { - Listen string -} - -func (cfg *RPCConfig) CreateListener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Listen(dsn[0], dsn[1]) -} - -func (cfg *RPCConfig) CreateDialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Dial(dsn[0], dsn[1]) -} - -func NewBus() *Bus { - return &Bus{services: make([]Service, 0)} -} diff --git a/service/service.go b/service/service.go deleted file mode 100644 index 2f704657..00000000 --- a/service/service.go +++ /dev/null @@ -1,9 +0,0 @@ -package service - -type Service interface { - Name() string - Configure(cfg Config) (bool, error) - RPC() interface{} - Serve() error - Stop() error -} diff --git a/utils/size.go b/utils/size.go deleted file mode 100644 index 176cc9e1..00000000 --- a/utils/size.go +++ /dev/null @@ -1,28 +0,0 @@ -package utils - -import ( - "strconv" - "strings" -) - -func ParseSize(size string) int64 { - if len(size) == 0 { - return 0 - } - - s, err := strconv.Atoi(size[:len(size)-1]) - if err != nil { - return 0 - } - - switch strings.ToLower(size[len(size)-1:]) { - case "k", "kb": - return int64(s * 1024) - case "m", "mb": - return int64(s * 1024 * 1024) - case "g", "gb": - return int64(s * 1024 * 1024 * 1024) - } - - return 0 -} diff --git a/utils/workers.go b/utils/workers.go deleted file mode 100644 index 0c4f778f..00000000 --- a/utils/workers.go +++ /dev/null @@ -1,37 +0,0 @@ -package utils - -import "github.com/spiral/roadrunner" - -// Worker provides information about specific worker. -type Worker struct { - // Pid contains process id. - Pid int `json:"pid"` - - // Status of the worker. - Status string `json:"status"` - - // Number of worker executions. - NumExecs uint64 `json:"numExecs"` - - // Created is unix nano timestamp of worker creation time. - Created int64 `json:"created"` - - // Updated is unix nano timestamp of last worker execution. - Updated int64 `json:"updated"` -} - -// FetchWorkers fetches list of workers from RR Server. -func FetchWorkers(srv *roadrunner.Server) (result []Worker) { - for _, w := range srv.Workers() { - state := w.State() - result = append(result, Worker{ - Pid: *w.Pid, - Status: state.String(), - NumExecs: state.NumExecs(), - Created: w.Created.UnixNano(), - Updated: state.Updated().UnixNano(), - }) - } - - return -} -- cgit v1.2.3