diff options
author | Wolfy-J <[email protected]> | 2018-06-13 23:46:13 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2018-06-13 23:46:13 +0300 |
commit | c3ccb29fe412baa8c4b02a1630f3a4a040ab722c (patch) | |
tree | 7a61db95d8e4d02ac5740d593ed708358f34949a | |
parent | 50f820833eeef8518b3b978b33c6f20391225162 (diff) | |
parent | 8ab8c64413ded038e3c8816647209c3b961b3a35 (diff) |
Merge pull request #9 from spiral/develop
HTTP
91 files changed, 6644 insertions, 272 deletions
diff --git a/.travis.yml b/.travis.yml index 5ea8b20d..4f7741f6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,26 +1,37 @@ -language: php +language: go sudo: required -php: - - 7.0 - - 7.1 - - 7.2 - go: - - 1.8 + - "1.10.x" before_install: + - go version + - sudo add-apt-repository -y ppa:ondrej/php + - sudo apt-get update + - sudo apt-get install -y php7.0-cli + - sudo cp `which php7.0` `which php` + - php -v - composer self-update install: - - composer install --no-interaction --prefer-source + - go get "github.com/spiral/roadrunner" - go get -u "github.com/spiral/goridge" + - go get -u "github.com/sirupsen/logrus" - go get -u "github.com/pkg/errors" - go get -u "github.com/stretchr/testify/assert" + - composer install --no-interaction --prefer-source --ignore-platform-reqs script: - - go test -race -v -coverprofile=coverage.txt -covermode=atomic + - go test -race -v -coverprofile=lib.txt -covermode=atomic + - go test ./service -race -v -coverprofile=service.txt -covermode=atomic + - go test ./service/rpc -race -v -coverprofile=rpc.txt -covermode=atomic + - go test ./service/http -race -v -coverprofile=http.txt -covermode=atomic + - go test ./service/static -race -v -coverprofile=static.txt -covermode=atomic after_success: - - bash <(curl -s https://codecov.io/bash) + - bash <(curl -s https://codecov.io/bash) -f lib.txt + - bash <(curl -s https://codecov.io/bash) -f service.txt + - bash <(curl -s https://codecov.io/bash) -f rpc.txt + - bash <(curl -s https://codecov.io/bash) -f http.txt + - bash <(curl -s https://codecov.io/bash) -f static.txt
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..e5e66db7 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +CHANGELOG +========= + +v1.0.0 (???? date) +------ +- worker.State.Updated() has been removed in order to improve overall performance +- staticPool can automatically replace workers killed from outside +- server would not attempt to rebuild static pool in case of reoccurring failure +- MORE STUFF
\ No newline at end of file @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 SpiralScout +Copyright (c) 2018 SpiralScout Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..9726ed69 --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ +test: + go test -v -race -cover + go test -v -race -cover ./service + go test -v -race -cover ./service/rpc + go test -v -race -cover ./service/http + go test -v -race -cover ./service/static
\ No newline at end of file @@ -69,7 +69,7 @@ while ($body = $rr->receive($context)) { } } ``` -> Check how to init relay [here](./tests/client.php). More examples can be found in tests. +> Check how to init relay [here](./php-src/tests/client.php). More examples can be found in tests. License: -------- diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml new file mode 100644 index 00000000..ab0f3e7f --- /dev/null +++ b/cmd/rr/.rr.yaml @@ -0,0 +1,56 @@ +# rpc bus allows php application and external clients to talk to rr services. +rpc: + # enable rpc server + enable: true + + # rpc connection DSN. Supported TCP and Unix sockets. + listen: tcp://127.0.0.1:6001 + +# http service configuration. +http: + # set to false to disable http server. + enable: true + + # http host to listen. + address: 0.0.0.0:8080 + + # max POST request size, including file uploads in MB. + maxRequest: 200 + + # file upload configuration. + uploads: + # list of file extensions which are forbidden for uploading. + forbid: [".php", ".exe", ".bat"] + + # http worker pool configuration. + workers: + # php worker command. + command: "php psr-worker.php pipes" + + # connection method (pipes, tcp://:9000, unix://socket.unix). + relay: "pipes" + + # worker pool configuration. + pool: + # number of workers to be serving. + numWorkers: 4 + + # maximum jobs per worker, 0 - unlimited. + maxJobs: 0 + + # for how long worker is allowed to be bootstrapped. In nanoseconds :( + allocateTimeout: 600000000 + + # amount of time given to worker to gracefully destruct itself. In nanoseconds :( + destroyTimeout: 600000000 + +# static file serving. +static: + # serve http static files + enable: false + + # root directory for static file (http would not serve .php and .htaccess files). + dir: "public" + + # list of extensions for forbid for serving. + forbid: [".php", ".htaccess"]
\ No newline at end of file diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE new file mode 100644 index 00000000..efb98c87 --- /dev/null +++ b/cmd/rr/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 SpiralScout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go new file mode 100644 index 00000000..bea42747 --- /dev/null +++ b/cmd/rr/cmd/root.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/cmd/rr/utils" + "os" +) + +// Service bus for all the commands. +var ( + cfgFile string + verbose bool + + // Logger - shared logger. + Logger = logrus.New() + + // Container - shared service bus. + Container = service.NewContainer(Logger) + + // CLI is application endpoint. + CLI = &cobra.Command{ + Use: "rr", + SilenceErrors: true, + SilenceUsage: true, + Short: utils.Sprintf("<green>RoadRunner, PHP Application Server.</reset>"), + } +) + +// ViperWrapper provides interface bridge between v configs and service.Config. +type ViperWrapper struct { + v *viper.Viper +} + +// Get nested config section (sub-map), returns nil if section not found. +func (w *ViperWrapper) Get(key string) service.Config { + sub := w.v.Sub(key) + if sub == nil { + return nil + } + + return &ViperWrapper{sub} +} + +// Unmarshal unmarshal config data into given struct. +func (w *ViperWrapper) Unmarshal(out interface{}) error { + return w.v.Unmarshal(out) +} + +// Execute adds all child commands to the CLI command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the CLI. +func Execute() { + if err := CLI.Execute(); err != nil { + utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) + os.Exit(1) + } +} + +func init() { + CLI.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output") + CLI.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)") + + cobra.OnInitialize(func() { + if verbose { + Logger.SetLevel(logrus.DebugLevel) + } + + if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { + if err := Container.Init(cfg); err != nil { + utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) + os.Exit(1) + } + } + }) +} + +func initConfig(cfgFile string, path []string, name string) service.Config { + cfg := viper.New() + + if cfgFile != "" { + // Use cfg file from the flag. + cfg.SetConfigFile(cfgFile) + } else { + // automatic location + for _, p := range path { + cfg.AddConfigPath(p) + } + + cfg.SetConfigName(name) + } + + // read in environment variables that match + cfg.AutomaticEnv() + + // If a cfg file is found, read it in. + if err := cfg.ReadInConfig(); err != nil { + Logger.Warnf("config: %s", err) + return nil + } + + return &ViperWrapper{cfg} +} diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go new file mode 100644 index 00000000..c53f7ce9 --- /dev/null +++ b/cmd/rr/cmd/serve.go @@ -0,0 +1,49 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" +) + +var stopSignal = make(chan os.Signal, 1) + +func init() { + CLI.AddCommand(&cobra.Command{ + Use: "serve", + Short: "Serve RoadRunner service(s)", + RunE: serveHandler, + }) + + signal.Notify(stopSignal, syscall.SIGTERM) + signal.Notify(stopSignal, syscall.SIGINT) +} + +func serveHandler(cmd *cobra.Command, args []string) error { + go Container.Serve() + <-stopSignal + Container.Stop() + + return nil +} diff --git a/cmd/rr/debug/listener.go b/cmd/rr/debug/listener.go new file mode 100644 index 00000000..1b6fcf83 --- /dev/null +++ b/cmd/rr/debug/listener.go @@ -0,0 +1,93 @@ +package debug + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/cmd/rr/utils" + "github.com/spiral/roadrunner" +) + +// Listener provide debug callback for system events. With colors! +type listener struct{ logger *logrus.Logger } + +// NewListener creates new debug listener. +func NewListener(logger *logrus.Logger) *listener { + return &listener{logger} +} + +// Listener listens to http events and generates nice looking output. +func (s *listener) Listener(event int, ctx interface{}) { + // http events + switch event { + case http.EventResponse: + log := ctx.(*http.Event) + s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + case http.EventError: + log := ctx.(*http.Event) + + if _, ok := log.Error.(roadrunner.JobError); ok { + s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + } else { + s.logger.Info(utils.Sprintf( + "%s <white+hb>%s</reset> %s <red>%s</reset>", + statusColor(log.Status), + log.Method, + log.Uri, + log.Error, + )) + } + } + + switch event { + case roadrunner.EventWorkerKill: + w := ctx.(*roadrunner.Worker) + s.logger.Warning(utils.Sprintf( + "<white+hb>worker.%v</reset> <yellow>killed</red>", + *w.Pid, + )) + + case roadrunner.EventWorkerError: + err := ctx.(roadrunner.WorkerError) + s.logger.Error(utils.Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *err.Worker.Pid, + err.Caused, + )) + } + + // rr server events + switch event { + case roadrunner.EventServerFailure: + s.logger.Error(utils.Sprintf("<red>server is dead</reset>")) + } + + // pool events + switch event { + case roadrunner.EventPoolConstruct: + s.logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>")) + case roadrunner.EventPoolError: + s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx)) + } +} + +// Serve serves. +func (s *listener) Serve() error { return nil } + +// Stop stops the Listener. +func (s *listener) Stop() {} + +func statusColor(status int) string { + if status < 300 { + return utils.Sprintf("<green>%v</reset>", status) + } + + if status < 400 { + return utils.Sprintf("<cyan>%v</reset>", status) + } + + if status < 500 { + return utils.Sprintf("<yellow>%v</reset>", status) + } + + return utils.Sprintf("<red>%v</reset>", status) +} diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go new file mode 100644 index 00000000..d0a42b64 --- /dev/null +++ b/cmd/rr/http/reset.go @@ -0,0 +1,61 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "errors" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/cmd/rr/utils" +) + +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:reset", + Short: "Reload RoadRunner worker pools for the HTTP service", + RunE: reloadHandler, + }) +} + +func reloadHandler(cmd *cobra.Command, args []string) error { + svc, st := rr.Container.Get(rpc.ID) + if st < service.StatusConfigured { + return errors.New("RPC service is not configured") + } + + client, err := svc.(*rpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + utils.Printf("<green>restarting http worker pool</reset>: ") + + var r string + if err := client.Call("http.Reset", true, &r); err != nil { + return err + } + + utils.Printf("<green+hb>done</reset>") + return nil +} diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go new file mode 100644 index 00000000..728c415e --- /dev/null +++ b/cmd/rr/http/workers.go @@ -0,0 +1,167 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "errors" + tm "github.com/buger/goterm" + "github.com/dustin/go-humanize" + "github.com/olekukonko/tablewriter" + "github.com/shirou/gopsutil/process" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service/http" + rrpc "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/cmd/rr/utils" + "net/rpc" + "os" + "strconv" + "time" + "os/signal" + "syscall" +) + +var ( + interactive bool + stopSignal = make(chan os.Signal, 1) +) + +func init() { + workersCommand := &cobra.Command{ + Use: "http:workers", + Short: "List workers associated with RoadRunner HTTP service", + RunE: workersHandler, + } + + workersCommand.Flags().BoolVarP( + &interactive, + "interactive", + "i", + false, + "render interactive workers table", + ) + + rr.CLI.AddCommand(workersCommand) + + signal.Notify(stopSignal, syscall.SIGTERM) + signal.Notify(stopSignal, syscall.SIGINT) +} + +func workersHandler(cmd *cobra.Command, args []string) (err error) { + defer func() { + if r, ok := recover().(error); ok { + err = r + } + }() + + svc, st := rr.Container.Get(rrpc.ID) + if st < service.StatusConfigured { + return errors.New("RPC service is not configured") + } + + client, err := svc.(*rrpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + if !interactive { + showWorkers(client) + return nil + } + + tm.Clear() + for { + select { + case <-stopSignal: + return nil + case <-time.NewTicker(time.Millisecond * 500).C: + tm.MoveCursor(1, 1) + showWorkers(client) + tm.Flush() + } + } + + return nil +} + +func showWorkers(client *rpc.Client) { + var r http.WorkerList + if err := client.Call("http.Workers", true, &r); err != nil { + panic(err) + } + + tw := tablewriter.NewWriter(os.Stdout) + tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 9) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 7) + tw.SetColMinWidth(4, 18) + + for _, w := range r.Workers { + tw.Append([]string{ + strconv.Itoa(w.Pid), + renderStatus(w.Status), + renderJobs(w.NumJobs), + renderMemory(w.Pid), + renderAlive(time.Unix(0, w.Created)), + }) + } + + tw.Render() +} + +func renderStatus(status string) string { + switch status { + case "inactive": + return utils.Sprintf("<yellow>inactive</reset>") + case "ready": + return utils.Sprintf("<cyan>ready</reset>") + case "working": + return utils.Sprintf("<green>working</reset>") + case "stopped": + return utils.Sprintf("<red>stopped</reset>") + case "errored": + return utils.Sprintf("<red>errored</reset>") + } + + return status +} + +func renderJobs(number int64) string { + return humanize.Comma(int64(number)) +} + +func renderAlive(t time.Time) string { + return humanize.RelTime(t, time.Now(), "ago", "") +} + +func renderMemory(pid int) string { + p, _ := process.NewProcess(int32(pid)) + i, err := p.MemoryInfo() + if err != nil { + return err.Error() + } + + return humanize.Bytes(i.VMS) +} diff --git a/cmd/rr/main.go b/cmd/rr/main.go new file mode 100644 index 00000000..ee6b915b --- /dev/null +++ b/cmd/rr/main.go @@ -0,0 +1,64 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + + // services (plugins) + "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/service/static" + + // cli plugins + _ "github.com/spiral/roadrunner/cmd/rr/http" + "github.com/spiral/roadrunner/cmd/rr/debug" + + "github.com/spf13/cobra" +) + +var debugMode bool + +func main() { + + // provides ability to make local connection to services + rr.Container.Register(rpc.ID, &rpc.Service{}) + + // http serving + rr.Container.Register(http.ID, &http.Service{}) + + // serving static files + rr.Container.Register(static.ID, &static.Service{}) + + // debug mode + rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode", ) + cobra.OnInitialize(func() { + if debugMode { + service, _ := rr.Container.Get(http.ID) + service.(*http.Service).AddListener(debug.NewListener(rr.Logger).Listener) + } + }) + + // you can register additional commands using cmd.CLI + rr.Execute() +} diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go new file mode 100644 index 00000000..e0fb6931 --- /dev/null +++ b/cmd/rr/utils/cprint.go @@ -0,0 +1,28 @@ +package utils + +import ( + "fmt" + "regexp" + "strings" + "github.com/mgutz/ansi" +) + +var reg *regexp.Regexp + +func init() { + reg, _ = regexp.Compile(`<([^>]+)>`) +} + +// Printf works identically to fmt.Print but adds `<white+hb>color formatting support for CLI</reset>`. +func Printf(format string, args ...interface{}) { + fmt.Print(Sprintf(format, args...)) +} + +// Sprintf works identically to fmt.Sprintf but adds `<white+hb>color formatting support for CLI</reset>`. +func Sprintf(format string, args ...interface{}) string { + format = reg.ReplaceAllStringFunc(format, func(s string) string { + return ansi.ColorCode(strings.Trim(s, "<>/")) + }) + + return fmt.Sprintf(format, args...) +}
\ No newline at end of file diff --git a/composer.json b/composer.json index b72f6de1..5d946f49 100644 --- a/composer.json +++ b/composer.json @@ -11,14 +11,13 @@ ], "require": { "php": "^7.0", - "spiral/goridge": "^2.0" - }, - "require-dev": { - "phpunit/phpunit": "~6.0" + "spiral/goridge": "^2.0", + "psr/http-message": "^1.0", + "zendframework/zend-diactoros": "^1.7" }, "autoload": { "psr-4": { - "Spiral\\RoadRunner\\": "source/" + "Spiral\\RoadRunner\\": "php-src/" } } -}
\ No newline at end of file +} @@ -9,34 +9,34 @@ import ( type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. - NumWorkers uint64 + NumWorkers int64 - // MaxExecutions defines how many executions is allowed for the worker until + // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxExecutions uint64 + MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. - AllocateTimeout time.Duration + AllocateTimeout time.Duration //todo: to milleseconds? // DestroyTimeout defines for how long pool should be waiting for worker to // properly stop, if timeout reached worker will be killed. - DestroyTimeout time.Duration + DestroyTimeout time.Duration //todo: to milleseconds? } -// Valid returns error if config not valid +// Reconfigure returns error if cfg not valid func (cfg *Config) Valid() error { if cfg.NumWorkers == 0 { - return fmt.Errorf("config.NumWorkers must be set") + return fmt.Errorf("pool.NumWorkers must be set") } if cfg.AllocateTimeout == 0 { - return fmt.Errorf("config.AllocateTimeout must be set") + return fmt.Errorf("pool.AllocateTimeout must be set") } if cfg.DestroyTimeout == 0 { - return fmt.Errorf("config.DestroyTimeout must be set") + return fmt.Errorf("pool.DestroyTimeout must be set") } return nil diff --git a/config_test.go b/config_test.go index 64bad7cb..89cde8f1 100644 --- a/config_test.go +++ b/config_test.go @@ -14,7 +14,7 @@ func Test_NumWorkers(t *testing.T) { err := cfg.Valid() assert.NotNil(t, err) - assert.Equal(t, "config.NumWorkers must be set", err.Error()) + assert.Equal(t, "pool.NumWorkers must be set", err.Error()) } func Test_AllocateTimeout(t *testing.T) { @@ -25,7 +25,7 @@ func Test_AllocateTimeout(t *testing.T) { err := cfg.Valid() assert.NotNil(t, err) - assert.Equal(t, "config.AllocateTimeout must be set", err.Error()) + assert.Equal(t, "pool.AllocateTimeout must be set", err.Error()) } func Test_DestroyTimeout(t *testing.T) { @@ -36,5 +36,5 @@ func Test_DestroyTimeout(t *testing.T) { err := cfg.Valid() assert.NotNil(t, err) - assert.Equal(t, "config.DestroyTimeout must be set", err.Error()) + assert.Equal(t, "pool.DestroyTimeout must be set", err.Error()) } diff --git a/error_buffer.go b/error_buffer.go new file mode 100644 index 00000000..fcf566c8 --- /dev/null +++ b/error_buffer.go @@ -0,0 +1,39 @@ +package roadrunner + +import ( + "bytes" + "sync" +) + +// thread safe errBuffer +type errBuffer struct { + mu sync.Mutex + buffer *bytes.Buffer +} + +// Len returns the number of bytes of the unread portion of the errBuffer; +// b.Len() == len(b.Bytes()). +func (b *errBuffer) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + + return b.buffer.Len() +} + +// Write appends the contents of p to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of p; err is always nil. If the +// errBuffer becomes too large, Write will panic with ErrTooLarge. +func (b *errBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + + return b.buffer.Write(p) +} + +// Strings fetches all errBuffer data into string. +func (b *errBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + + return b.buffer.String() +} diff --git a/error_buffer_test.go b/error_buffer_test.go new file mode 100644 index 00000000..afbc80e2 --- /dev/null +++ b/error_buffer_test.go @@ -0,0 +1,14 @@ +package roadrunner + +import ( + "bytes" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestErrBuffer_Write_Len(t *testing.T) { + buf := &errBuffer{buffer: new(bytes.Buffer)} + buf.Write([]byte("hello")) + assert.Equal(t, 5, buf.Len()) + assert.Equal(t, buf.String(), "hello") +} diff --git a/errors.go b/errors.go new file mode 100644 index 00000000..db995721 --- /dev/null +++ b/errors.go @@ -0,0 +1,24 @@ +package roadrunner + +// JobError is job level error (no worker halt), wraps at top +// of error context +type JobError []byte + +// Error converts error context to string +func (je JobError) Error() string { + return string(je) +} + +// WorkerError is worker related error +type WorkerError struct { + // Worker + Worker *Worker + + // Caused error + Caused error +} + +// Error converts error context to string +func (e WorkerError) Error() string { + return e.Caused.Error() +} diff --git a/job_error_test.go b/errors_test.go index 9b0fa53e..6bb650af 100644 --- a/job_error_test.go +++ b/errors_test.go @@ -1,6 +1,7 @@ package roadrunner import ( + "errors" "github.com/stretchr/testify/assert" "testing" ) @@ -9,3 +10,8 @@ func Test_JobError_Error(t *testing.T) { e := JobError([]byte("error")) assert.Equal(t, "error", e.Error()) } + +func Test_WorkerError_Error(t *testing.T) { + e := WorkerError{Worker: nil, Caused: errors.New("error")} + assert.Equal(t, "error", e.Error()) +} @@ -7,4 +7,7 @@ type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) + + // Close the factory and underlying connections. + Close() error } diff --git a/job_error.go b/job_error.go deleted file mode 100644 index d024ad11..00000000 --- a/job_error.go +++ /dev/null @@ -1,10 +0,0 @@ -package roadrunner - -// JobError is job level error (no worker halt), wraps at top -// of error context -type JobError []byte - -// Error converts error context to string -func (je JobError) Error() string { - return string(je) -} @@ -3,10 +3,10 @@ package roadrunner // Payload carries binary header and body to workers and // back to the server. type Payload struct { - // Context represent payload context, might be omitted + // Context represent payload context, might be omitted. Context []byte - // Body contains binary payload to be processed by worker + // body contains binary payload to be processed by worker. Body []byte } diff --git a/source/Exceptions/RoadRunnerException.php b/php-src/Exceptions/RoadRunnerException.php index fa7b8da3..fa7b8da3 100644 --- a/source/Exceptions/RoadRunnerException.php +++ b/php-src/Exceptions/RoadRunnerException.php diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php new file mode 100644 index 00000000..f8913a8d --- /dev/null +++ b/php-src/PSR7Client.php @@ -0,0 +1,131 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; +use Zend\Diactoros; + +/** + * Manages PSR-7 request and response. + */ +class PSR7Client +{ + /** + * @varWorker + */ + private $worker; + + /** + * @param Worker $worker + */ + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest() + { + $body = $this->worker->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + if (empty($ctx = json_decode($ctx, true))) { + // invalid context + return null; + } + + parse_str($ctx['rawQuery'], $query); + + $bodyStream = 'php://input'; + $parsedBody = null; + if ($ctx['parsed']) { + $parsedBody = json_decode($body, true); + } elseif ($body != null) { + $bodyStream = new Diactoros\Stream("php://memory", "rwb"); + $bodyStream->write($body); + } + + return new Diactoros\ServerRequest( + $_SERVER, + $this->wrapUploads($ctx['uploads']), + $ctx['uri'], + $ctx['method'], + $bodyStream, + $ctx['headers'], + $ctx['cookies'], + $query, + $parsedBody, + $ctx['protocol'] + ); + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response) + { + $headers = $response->getHeaders(); + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->worker->send($response->getBody(), json_encode([ + 'status' => $response->getStatusCode(), + 'headers' => $headers + ])); + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array $files + * + * @return array + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $file) { + if (!isset($file['name'])) { + $result[$index] = $this->wrapUploads($file); + continue; + } + + $result[$index] = new Diactoros\UploadedFile( + $file['tmpName'], + $file['size'], + $file['error'], + $file['name'], + $file['mime'] + ); + } + + return $result; + } +}
\ No newline at end of file diff --git a/source/Worker.php b/php-src/Worker.php index 5835baf2..3e013090 100644 --- a/source/Worker.php +++ b/php-src/Worker.php @@ -56,6 +56,9 @@ class Worker return $this->receive($header); } + // no context for the termination. + $header = null; + // Expect process termination return null; } diff --git a/tests/broken.php b/php-src/tests/broken.php index b1a3839e..b1a3839e 100644 --- a/tests/broken.php +++ b/php-src/tests/broken.php diff --git a/tests/client.php b/php-src/tests/client.php index 31caa410..fd5d60be 100644 --- a/tests/client.php +++ b/php-src/tests/client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor/autoload.php"; +require dirname(__DIR__) . "/../vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/delay.php b/php-src/tests/delay.php index bfde2fc4..bfde2fc4 100644 --- a/tests/delay.php +++ b/php-src/tests/delay.php diff --git a/tests/echo.php b/php-src/tests/echo.php index ba58ff30..ba58ff30 100644 --- a/tests/echo.php +++ b/php-src/tests/echo.php diff --git a/tests/error.php b/php-src/tests/error.php index ebd3418b..ebd3418b 100644 --- a/tests/error.php +++ b/php-src/tests/error.php diff --git a/tests/failboot.php b/php-src/tests/failboot.php index fa8b96f6..fa8b96f6 100644 --- a/tests/failboot.php +++ b/php-src/tests/failboot.php diff --git a/tests/head.php b/php-src/tests/head.php index 4f4e4061..4f4e4061 100644 --- a/tests/head.php +++ b/php-src/tests/head.php diff --git a/php-src/tests/http/client.php b/php-src/tests/http/client.php new file mode 100644 index 00000000..3b6b5dc6 --- /dev/null +++ b/php-src/tests/http/client.php @@ -0,0 +1,45 @@ +<?php + +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/../../vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge) = [$argv[1], $argv[2]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + break; + + default: + die("invalid protocol selection"); +} + +$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay)); +require_once sprintf("%s/%s.php", __DIR__, $test); + +while ($req = $psr7->acceptRequest()) { + try { + $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response())); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/php-src/tests/http/cookie.php b/php-src/tests/http/cookie.php new file mode 100644 index 00000000..196ceee2 --- /dev/null +++ b/php-src/tests/http/cookie.php @@ -0,0 +1,334 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + $resp->getBody()->write(strtoupper($req->getCookieParams()['input'])); + + return $resp->withAddedHeader( + "Set-Cookie", + (new Cookie('output', 'cookie-output'))->createHeader() + ); +} + +final class Cookie +{ + /** + * The name of the cookie. + * + * @var string + */ + private $name = ''; + /** + * The value of the cookie. This value is stored on the clients computer; do not store sensitive + * information. + * + * @var string|null + */ + private $value = null; + /** + * Cookie lifetime. This value specified in seconds and declares period of time in which cookie + * will expire relatively to current time() value. + * + * @var int|null + */ + private $lifetime = null; + /** + * The path on the server in which the cookie will be available on. + * + * If set to '/', the cookie will be available within the entire domain. If set to '/foo/', + * the cookie will only be available within the /foo/ directory and all sub-directories such as + * /foo/bar/ of domain. The default value is the current directory that the cookie is being set + * in. + * + * @var string|null + */ + private $path = null; + /** + * The domain that the cookie is available. To make the cookie available on all subdomains of + * example.com then you'd set it to '.example.com'. The . is not required but makes it + * compatible with more browsers. Setting it to www.example.com will make the cookie only + * available in the www subdomain. Refer to tail matching in the spec for details. + * + * @var string|null + */ + private $domain = null; + /** + * Indicates that the cookie should only be transmitted over a secure HTTPS connection from the + * client. When set to true, the cookie will only be set if a secure connection exists. + * On the server-side, it's on the programmer to send this kind of cookie only on secure + * connection + * (e.g. with respect to $_SERVER["HTTPS"]). + * + * @var bool|null + */ + private $secure = null; + /** + * When true the cookie will be made accessible only through the HTTP protocol. This means that + * the cookie won't be accessible by scripting languages, such as JavaScript. This setting can + * effectively help to reduce identity theft through XSS attacks (although it is not supported + * by all browsers). + * + * @var bool + */ + private $httpOnly = true; + + /** + * New Cookie instance, cookies used to schedule cookie set while dispatching Response. + * + * @link http://php.net/manual/en/function.setcookie.php + * + * @param string $name The name of the cookie. + * @param string $value The value of the cookie. This value is stored on the clients + * computer; do not store sensitive information. + * @param int $lifetime Cookie lifetime. This value specified in seconds and declares period + * of time in which cookie will expire relatively to current time() + * value. + * @param string $path The path on the server in which the cookie will be available on. + * If set to '/', the cookie will be available within the entire + * domain. + * If set to '/foo/', the cookie will only be available within the + * /foo/ + * directory and all sub-directories such as /foo/bar/ of domain. The + * default value is the current directory that the cookie is being set + * in. + * @param string $domain The domain that the cookie is available. To make the cookie + * available + * on all subdomains of example.com then you'd set it to + * '.example.com'. + * The . is not required but makes it compatible with more browsers. + * Setting it to www.example.com will make the cookie only available in + * the www subdomain. Refer to tail matching in the spec for details. + * @param bool $secure Indicates that the cookie should only be transmitted over a secure + * HTTPS connection from the client. When set to true, the cookie will + * only be set if a secure connection exists. On the server-side, it's + * on the programmer to send this kind of cookie only on secure + * connection (e.g. with respect to $_SERVER["HTTPS"]). + * @param bool $httpOnly When true the cookie will be made accessible only through the HTTP + * protocol. This means that the cookie won't be accessible by + * scripting + * languages, such as JavaScript. This setting can effectively help to + * reduce identity theft through XSS attacks (although it is not + * supported by all browsers). + */ + public function __construct( + string $name, + string $value = null, + int $lifetime = null, + string $path = null, + string $domain = null, + bool $secure = false, + bool $httpOnly = true + ) { + $this->name = $name; + $this->value = $value; + $this->lifetime = $lifetime; + $this->path = $path; + $this->domain = $domain; + $this->secure = $secure; + $this->httpOnly = $httpOnly; + } + + /** + * The name of the cookie. + * + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * The value of the cookie. This value is stored on the clients computer; do not store sensitive + * information. + * + * @return string|null + */ + public function getValue() + { + return $this->value; + } + + /** + * The time the cookie expires. This is a Unix timestamp so is in number of seconds since the + * epoch. In other words, you'll most likely set this with the time function plus the number of + * seconds before you want it to expire. Or you might use mktime. + * + * Will return null if lifetime is not specified. + * + * @return int|null + */ + public function getExpires() + { + if ($this->lifetime === null) { + return null; + } + + return time() + $this->lifetime; + } + + /** + * The path on the server in which the cookie will be available on. + * + * If set to '/', the cookie will be available within the entire domain. If set to '/foo/', + * the cookie will only be available within the /foo/ directory and all sub-directories such as + * /foo/bar/ of domain. The default value is the current directory that the cookie is being set + * in. + * + * @return string|null + */ + public function getPath() + { + return $this->path; + } + + /** + * The domain that the cookie is available. To make the cookie available on all subdomains of + * example.com then you'd set it to '.example.com'. The . is not required but makes it + * compatible with more browsers. Setting it to www.example.com will make the cookie only + * available in the www subdomain. Refer to tail matching in the spec for details. + * + * @return string|null + */ + public function getDomain() + { + return $this->domain; + } + + /** + * Indicates that the cookie should only be transmitted over a secure HTTPS connection from the + * client. When set to true, the cookie will only be set if a secure connection exists. + * On the server-side, it's on the programmer to send this kind of cookie only on secure + * connection + * (e.g. with respect to $_SERVER["HTTPS"]). + * + * @return bool + */ + public function isSecure(): bool + { + return $this->secure; + } + + /** + * When true the cookie will be made accessible only through the HTTP protocol. This means that + * the cookie won't be accessible by scripting languages, such as JavaScript. This setting can + * effectively help to reduce identity theft through XSS attacks (although it is not supported + * by all browsers). + * + * @return bool + */ + public function isHttpOnly(): bool + { + return $this->httpOnly; + } + + /** + * Get new cookie with altered value. Original cookie object should not be changed. + * + * @param string $value + * + * @return Cookie + */ + public function withValue(string $value): self + { + $cookie = clone $this; + $cookie->value = $value; + + return $cookie; + } + + /** + * Convert cookie instance to string. + * + * @link http://www.w3.org/Protocols/rfc2109/rfc2109 + * @return string + */ + public function createHeader(): string + { + $header = [ + rawurlencode($this->name) . '=' . rawurlencode($this->value) + ]; + if ($this->lifetime !== null) { + $header[] = 'Expires=' . gmdate(\DateTime::COOKIE, $this->getExpires()); + $header[] = 'Max-Age=' . $this->lifetime; + } + if (!empty($this->path)) { + $header[] = 'Path=' . $this->path; + } + if (!empty($this->domain)) { + $header[] = 'Domain=' . $this->domain; + } + if ($this->secure) { + $header[] = 'Secure'; + } + if ($this->httpOnly) { + $header[] = 'HttpOnly'; + } + + return join('; ', $header); + } + + /** + * New Cookie instance, cookies used to schedule cookie set while dispatching Response. + * Static constructor. + * + * @link http://php.net/manual/en/function.setcookie.php + * + * @param string $name The name of the cookie. + * @param string $value The value of the cookie. This value is stored on the clients + * computer; do not store sensitive information. + * @param int $lifetime Cookie lifetime. This value specified in seconds and declares period + * of time in which cookie will expire relatively to current time() + * value. + * @param string $path The path on the server in which the cookie will be available on. + * If set to '/', the cookie will be available within the entire + * domain. + * If set to '/foo/', the cookie will only be available within the + * /foo/ + * directory and all sub-directories such as /foo/bar/ of domain. The + * default value is the current directory that the cookie is being set + * in. + * @param string $domain The domain that the cookie is available. To make the cookie + * available + * on all subdomains of example.com then you'd set it to + * '.example.com'. + * The . is not required but makes it compatible with more browsers. + * Setting it to www.example.com will make the cookie only available in + * the www subdomain. Refer to tail matching in the spec for details. + * @param bool $secure Indicates that the cookie should only be transmitted over a secure + * HTTPS connection from the client. When set to true, the cookie will + * only be set if a secure connection exists. On the server-side, it's + * on the programmer to send this kind of cookie only on secure + * connection (e.g. with respect to $_SERVER["HTTPS"]). + * @param bool $httpOnly When true the cookie will be made accessible only through the HTTP + * protocol. This means that the cookie won't be accessible by + * scripting + * languages, such as JavaScript. This setting can effectively help to + * reduce identity theft through XSS attacks (although it is not + * supported by all browsers). + * + * @return Cookie + */ + public static function create( + string $name, + string $value = null, + int $lifetime = null, + string $path = null, + string $domain = null, + bool $secure = false, + bool $httpOnly = true + ): self { + return new self($name, $value, $lifetime, $path, $domain, $secure, $httpOnly); + } + + /** + * @return string + */ + public function __toString(): string + { + return $this->createHeader(); + } +}
\ No newline at end of file diff --git a/php-src/tests/http/data.php b/php-src/tests/http/data.php new file mode 100644 index 00000000..c5e0bab0 --- /dev/null +++ b/php-src/tests/http/data.php @@ -0,0 +1,18 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + + $data = $req->getParsedBody(); + + ksort($data); + ksort($data['arr']); + ksort($data['arr']['x']['y']); + + $resp->getBody()->write(json_encode($data)); + + return $resp; +}
\ No newline at end of file diff --git a/php-src/tests/http/echo.php b/php-src/tests/http/echo.php new file mode 100644 index 00000000..7004ada0 --- /dev/null +++ b/php-src/tests/http/echo.php @@ -0,0 +1,10 @@ +<?php + +use \Psr\Http\Message\ServerRequestInterface; +use \Psr\Http\Message\ResponseInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + $resp->getBody()->write(strtoupper($req->getQueryParams()['hello'])); + return $resp->withStatus(201); +}
\ No newline at end of file diff --git a/php-src/tests/http/error.php b/php-src/tests/http/error.php new file mode 100644 index 00000000..6df0d4b5 --- /dev/null +++ b/php-src/tests/http/error.php @@ -0,0 +1,9 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + throw new Error("error"); +}
\ No newline at end of file diff --git a/php-src/tests/http/error2.php b/php-src/tests/http/error2.php new file mode 100644 index 00000000..617b5a3f --- /dev/null +++ b/php-src/tests/http/error2.php @@ -0,0 +1,9 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + exit(); +}
\ No newline at end of file diff --git a/php-src/tests/http/header.php b/php-src/tests/http/header.php new file mode 100644 index 00000000..e5b295b6 --- /dev/null +++ b/php-src/tests/http/header.php @@ -0,0 +1,11 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + $resp->getBody()->write(strtoupper($req->getHeaderLine('input'))); + + return $resp->withAddedHeader("Header", $req->getQueryParams()['hello']); +}
\ No newline at end of file diff --git a/php-src/tests/http/payload.php b/php-src/tests/http/payload.php new file mode 100644 index 00000000..a16984c5 --- /dev/null +++ b/php-src/tests/http/payload.php @@ -0,0 +1,13 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + // we expect json body + $p = json_decode($req->getBody(), true); + $resp->getBody()->write(json_encode(array_flip($p))); + + return $resp; +}
\ No newline at end of file diff --git a/php-src/tests/http/pid.php b/php-src/tests/http/pid.php new file mode 100644 index 00000000..1cc322bf --- /dev/null +++ b/php-src/tests/http/pid.php @@ -0,0 +1,11 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + $resp->getBody()->write(getmypid()); + + return $resp; +}
\ No newline at end of file diff --git a/php-src/tests/http/upload.php b/php-src/tests/http/upload.php new file mode 100644 index 00000000..2f7c0b64 --- /dev/null +++ b/php-src/tests/http/upload.php @@ -0,0 +1,35 @@ +<?php + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + $files = $req->getUploadedFiles(); + array_walk_recursive($files, function (&$v) { + /** + * @var \Psr\Http\Message\UploadedFileInterface $v + */ + + if ($v->getError()) { + $v = [ + 'name' => $v->getClientFilename(), + 'size' => $v->getSize(), + 'mime' => $v->getClientMediaType(), + 'error' => $v->getError(), + ]; + } else { + $v = [ + 'name' => $v->getClientFilename(), + 'size' => $v->getSize(), + 'mime' => $v->getClientMediaType(), + 'error' => $v->getError(), + 'md5' => md5($v->getStream()->__toString()), + ]; + } + }); + + $resp->getBody()->write(json_encode($files, JSON_UNESCAPED_SLASHES)); + + return $resp; +}
\ No newline at end of file diff --git a/tests/pid.php b/php-src/tests/pid.php index a8cfa229..a8cfa229 100644 --- a/tests/pid.php +++ b/php-src/tests/pid.php diff --git a/php-src/tests/sample.txt b/php-src/tests/sample.txt new file mode 100644 index 00000000..eed7e79a --- /dev/null +++ b/php-src/tests/sample.txt @@ -0,0 +1 @@ +sample
\ No newline at end of file diff --git a/tests/slow-client.php b/php-src/tests/slow-client.php index f09142b5..2722868c 100644 --- a/tests/slow-client.php +++ b/php-src/tests/slow-client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor/autoload.php"; +require dirname(__DIR__) . "/../vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/stop.php b/php-src/tests/stop.php index caa485d6..caa485d6 100644 --- a/tests/stop.php +++ b/php-src/tests/stop.php diff --git a/pipe_factory.go b/pipe_factory.go index 3b2f2f88..d6fe0420 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -46,8 +46,13 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { if pid, err := fetchPID(w.rl); pid != *w.Pid { go func(w *Worker) { w.Kill() }(w) + if wErr := w.Wait(); wErr != nil { - err = errors.Wrap(wErr, err.Error()) + if _, ok := wErr.(*exec.ExitError); ok { + err = errors.Wrap(wErr, err.Error()) + } else { + err = wErr + } } return nil, errors.Wrap(err, "unable to connect to worker") @@ -56,3 +61,8 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { w.state.set(StateReady) return w, nil } + +// Close the factory. +func (f *PipeFactory) Close() error { + return nil +} diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 9d50e47f..ae276ab6 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -7,7 +7,7 @@ import ( ) func Test_Pipe_Start(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) assert.NoError(t, err) @@ -21,7 +21,7 @@ func Test_Pipe_Start(t *testing.T) { } func Test_Pipe_StartError(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") cmd.Start() w, err := NewPipeFactory().SpawnWorker(cmd) @@ -30,7 +30,7 @@ func Test_Pipe_StartError(t *testing.T) { } func Test_Pipe_PipeError(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") cmd.StdinPipe() w, err := NewPipeFactory().SpawnWorker(cmd) @@ -39,7 +39,7 @@ func Test_Pipe_PipeError(t *testing.T) { } func Test_Pipe_PipeError2(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") cmd.StdoutPipe() w, err := NewPipeFactory().SpawnWorker(cmd) @@ -48,7 +48,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "tests/failboot.php") + cmd := exec.Command("php", "php-src/tests/failboot.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) @@ -57,7 +57,7 @@ func Test_Pipe_Failboot(t *testing.T) { } func Test_Pipe_Invalid(t *testing.T) { - cmd := exec.Command("php", "tests/invalid.php") + cmd := exec.Command("php", "php-src/tests/invalid.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) @@ -65,7 +65,7 @@ func Test_Pipe_Invalid(t *testing.T) { } func Test_Pipe_Echo(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -84,7 +84,7 @@ func Test_Pipe_Echo(t *testing.T) { } func Test_Pipe_Broken(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -104,7 +104,7 @@ func Test_Pipe_Broken(t *testing.T) { func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := f.SpawnWorker(cmd) go func() { if w.Wait() != nil { @@ -117,7 +117,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -1,20 +1,29 @@ package roadrunner const ( - // EventCreated thrown when new worker is spawned. - EventCreated = iota + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct = iota + 100 - // EventDestruct thrown before worker destruction. - EventDestruct + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct - // EventError thrown any worker related even happen (error passed as context) - EventError + // EventWorkerKill thrown after worker is being forcefully killed. + EventWorkerKill + + // EventWorkerError thrown any worker related even happen (passed with WorkerError) + EventWorkerError + + // EventWorkerDead thrown when worker stops worker for any reason. + EventWorkerDead + + // EventPoolError caused on pool wide errors + EventPoolError ) // Pool managed set of inner worker processes. type Pool interface { - // Report all caused events to attached watcher. - Report(o func(event int, w *Worker, ctx interface{})) + // AddListener all caused events to attached watcher. + Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. Exec(rqs *Payload) (rsp *Payload, err error) diff --git a/server.go b/server.go new file mode 100644 index 00000000..4ffdf367 --- /dev/null +++ b/server.go @@ -0,0 +1,207 @@ +package roadrunner + +import ( + "fmt" + "sync" + "github.com/pkg/errors" +) + +const ( + // EventPoolConstruct triggered when server creates new pool. + EventServerStart = iota + 200 + + // EventPoolConstruct triggered when server creates new pool. + EventServerStop + + // EventServerFailure triggered when server is unable to replace dead pool. + EventServerFailure + + // EventPoolConstruct triggered when server creates new pool. + EventPoolConstruct + + // EventPoolDestruct triggered when server destroys existed pool. + EventPoolDestruct +) + +// Service manages pool creation and swapping. +type Server struct { + // configures server, pool, cmd creation and factory. + cfg *ServerConfig + + // protects pool while the re-configuration + mu sync.Mutex + + // indicates that server was started + started bool + + // creates and connects to workers + factory Factory + + // currently active pool instance + pool Pool + + // observes pool events (can be attached to multiple pools at the same time) + mul sync.Mutex + lsn func(event int, ctx interface{}) +} + +// NewServer creates new router. Make sure to call configure before the usage. +func NewServer(cfg *ServerConfig) *Server { + return &Server{cfg: cfg} +} + +// AddListener attaches server event watcher. +func (s *Server) Listen(l func(event int, ctx interface{})) { + s.mul.Lock() + defer s.mul.Unlock() + + s.lsn = l +} + +// Start underlying worker pool, configure factory and command provider. +func (s *Server) Start() (err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.factory, err = s.cfg.makeFactory(); err != nil { + return err + } + + if s.pool, err = NewPool(s.cfg.makeCommand(), s.factory, *s.cfg.Pool); err != nil { + return err + } + + s.pool.Listen(s.poolListener) + s.started = true + s.throw(EventServerStart, s) + + return nil +} + +// Stop underlying worker pool and close the factory. +func (s *Server) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.started { + return + } + + s.throw(EventPoolDestruct, s.pool) + s.pool.Destroy() + s.factory.Close() + + s.factory = nil + s.pool = nil + s.started = false + s.throw(EventServerStop, s) +} + +// Exec one task with given payload and context, returns result or error. +func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error) { + pool := s.Pool() + if pool == nil { + return nil, fmt.Errorf("no associared pool") + } + + return pool.Exec(rqs) +} + +// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory +// and relay settings. +func (s *Server) Reconfigure(cfg *ServerConfig) error { + s.mu.Lock() + if !s.started { + s.cfg = cfg + s.mu.Unlock() + return nil + } + s.mu.Unlock() + + if s.cfg.Differs(cfg) { + return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") + } + + s.mu.Lock() + previous := s.pool + s.mu.Unlock() + + pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + if err != nil { + return err + } + + s.pool.Listen(s.poolListener) + + s.mu.Lock() + s.cfg.Pool, s.pool = cfg.Pool, pool + s.mu.Unlock() + + s.throw(EventPoolConstruct, pool) + + if previous != nil { + go func(previous Pool) { + s.throw(EventPoolDestruct, previous) + previous.Destroy() + }(previous) + } + + return nil +} + +// Reset resets the state of underlying pool and rebuilds all of it's workers. +func (s *Server) Reset() error { + s.mu.Lock() + cfg := s.cfg + s.mu.Unlock() + + return s.Reconfigure(cfg) +} + +// Workers returns worker list associated with the server pool. +func (s *Server) Workers() (workers []*Worker) { + p := s.Pool() + if p == nil { + return nil + } + + return p.Workers() +} + +// Pool returns active pool or error. +func (s *Server) Pool() Pool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.pool +} + +// AddListener pool events. +func (s *Server) poolListener(event int, ctx interface{}) { + if event == EventPoolError { + // pool failure, rebuilding + if err := s.Reset(); err != nil { + s.mu.Lock() + s.started = false + s.pool = nil + s.factory = nil + s.mu.Unlock() + + // everything is dead, this is recoverable but heavy state + s.throw(EventServerFailure, err) + } + } + + // bypassing to user specified lsn + s.throw(event, ctx) +} + +// throw invokes event handler if any. +func (s *Server) throw(event int, ctx interface{}) { + s.mul.Lock() + defer s.mul.Unlock() + + if s.lsn != nil { + s.lsn(event, ctx) + } +} diff --git a/server_config.go b/server_config.go new file mode 100644 index 00000000..8af0e0f8 --- /dev/null +++ b/server_config.go @@ -0,0 +1,60 @@ +package roadrunner + +import ( + "errors" + "net" + "strings" + "time" + "os/exec" +) + +// Server config combines factory, pool and cmd configurations. +type ServerConfig struct { + // Command includes command strings with all the parameters, example: "php worker.php pipes". + Command string + + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration + + // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change + // while server is running. + Pool *Config +} + +// Differs returns true if configuration has changed but ignores pool or cmd changes. +func (cfg *ServerConfig) Differs(new *ServerConfig) bool { + return cfg.Relay != new.Relay || cfg.RelayTimeout != new.RelayTimeout +} + +// makeCommands returns new command provider based on configured options. +func (cfg *ServerConfig) makeCommand() func() *exec.Cmd { + var cmd = strings.Split(cfg.Command, " ") + return func() *exec.Cmd { + return exec.Command(cmd[0], cmd[1:]...) + } +} + +// makeFactory creates and connects new factory instance based on given parameters. +func (cfg *ServerConfig) makeFactory() (Factory, error) { + if cfg.Relay == "pipes" || cfg.Relay == "pipe" { + return NewPipeFactory(), nil + } + + dsn := strings.Split(cfg.Relay, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)") + } + + ln, err := net.Listen(dsn[0], dsn[1]) + if err != nil { + return nil, err + } + + return NewSocketFactory(ln, cfg.RelayTimeout), nil +} diff --git a/server_config_test.go b/server_config_test.go new file mode 100644 index 00000000..1831ae95 --- /dev/null +++ b/server_config_test.go @@ -0,0 +1,92 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "runtime" + "testing" +) + +func Test_ServerConfig_PipeFactory(t *testing.T) { + cfg := &ServerConfig{Relay: "pipes"} + f, err := cfg.makeFactory() + + assert.NoError(t, err) + assert.IsType(t, &PipeFactory{}, f) + + cfg = &ServerConfig{Relay: "pipe"} + f, err = cfg.makeFactory() + assert.NoError(t, err) + assert.NotNil(t, f) + defer f.Close() + + assert.NoError(t, err) + assert.IsType(t, &PipeFactory{}, f) +} + +func Test_ServerConfig_SocketFactory(t *testing.T) { + cfg := &ServerConfig{Relay: "tcp://:9111"} + f, err := cfg.makeFactory() + assert.NoError(t, err) + assert.NotNil(t, f) + defer f.Close() + + assert.NoError(t, err) + assert.IsType(t, &SocketFactory{}, f) + assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network()) + assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String()) + + cfg = &ServerConfig{Relay: "tcp://localhost:9112"} + f, err = cfg.makeFactory() + assert.NoError(t, err) + assert.NotNil(t, f) + defer f.Close() + + assert.NoError(t, err) + assert.IsType(t, &SocketFactory{}, f) + assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network()) + assert.Equal(t, "127.0.0.1:9112", f.(*SocketFactory).ls.Addr().String()) +} + +func Test_ServerConfig_UnixSocketFactory(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &ServerConfig{Relay: "unix://unix.sock"} + f, err := cfg.makeFactory() + defer f.Close() + + assert.NoError(t, err) + assert.IsType(t, &SocketFactory{}, f) + assert.Equal(t, "unix", f.(*SocketFactory).ls.Addr().Network()) + assert.Equal(t, "unix.sock", f.(*SocketFactory).ls.Addr().String()) +} + +func Test_ServerConfig_ErrorFactory(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &ServerConfig{Relay: "uni:unix.sock"} + f, err := cfg.makeFactory() + assert.Nil(t, f) + assert.Error(t, err) + assert.Equal(t, "invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)", err.Error()) +} + +func Test_ServerConfig_ErrorMethod(t *testing.T) { + cfg := &ServerConfig{Relay: "xinu://unix.sock"} + + f, err := cfg.makeFactory() + assert.Nil(t, f) + assert.Error(t, err) +} + +func Test_ServerConfig_Cmd(t *testing.T) { + cfg := &ServerConfig{ + Command: "php php-src/tests/client.php pipes", + } + + cmd := cfg.makeCommand() + assert.NotNil(t, cmd) +} diff --git a/server_test.go b/server_test.go new file mode 100644 index 00000000..13cacc2c --- /dev/null +++ b/server_test.go @@ -0,0 +1,229 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "runtime" + "testing" + "time" + "os/exec" +) + +func TestServer_PipesEcho(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + + res, err := srv.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func TestServer_SocketEcho(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo tcp", + Relay: "tcp://:9007", + RelayTimeout: 10 * time.Second, + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + + res, err := srv.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func TestServer_Configure_BeforeStart(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + err := srv.Reconfigure(&ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 2, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + assert.NoError(t, err) + + assert.NoError(t, srv.Start()) + + res, err := srv.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Len(t, srv.Workers(), 2) +} + +func TestServer_Stop_NotStarted(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + + srv.Stop() + assert.Nil(t, srv.Workers()) +} + +func TestServer_Reconfigure(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + assert.Len(t, srv.Workers(), 1) + + err := srv.Reconfigure(&ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 2, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + assert.NoError(t, err) + + assert.Len(t, srv.Workers(), 2) +} + +func TestServer_Reset(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + assert.Len(t, srv.Workers(), 1) + + pid := *srv.Workers()[0].Pid + assert.NoError(t, srv.Reset()) + assert.Len(t, srv.Workers(), 1) + assert.NotEqual(t, pid, srv.Workers()[0].Pid) +} + +func TestServer_ReplacePool(t *testing.T) { + srv := NewServer( + &ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + + constructed := make(chan interface{}) + srv.Listen(func(e int, ctx interface{}) { + if e == EventPoolConstruct { + close(constructed) + } + }) + + srv.Reset() + <-constructed + + for _, w := range srv.Workers() { + assert.Equal(t, StateReady, w.state.Value()) + } +} + +func TestServer_ServerFailure(t *testing.T) { + srv := NewServer(&ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer srv.Stop() + + assert.NoError(t, srv.Start()) + + failure := make(chan interface{}) + srv.Listen(func(e int, ctx interface{}) { + if e == EventServerFailure { + failure <- nil + } + }) + + // emulating potential server failure + srv.cfg.Command = "php php-src/tests/client.php echo broken-connection" + srv.pool.(*StaticPool).cmd = func() *exec.Cmd { + return exec.Command("php", "php-src/tests/client.php", "echo", "broken-connection") + } + + // killing random worker and expecting pool to replace it + srv.Workers()[0].cmd.Process.Kill() + + <-failure + assert.True(t, true) +} diff --git a/service/container.go b/service/container.go new file mode 100644 index 00000000..ce9146bc --- /dev/null +++ b/service/container.go @@ -0,0 +1,171 @@ +package service + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// Container controls all internal RR services and provides plugin based system. +type Container interface { + // Register add new service to the container under given name. + Register(name string, service Service) + + // Reconfigure configures all underlying services with given configuration. + Init(cfg Config) error + + // Check if svc has been registered. + Has(service string) bool + + // Get returns svc instance by it's name or nil if svc not found. Method returns current service status + // as second value. + Get(service string) (svc Service, status int) + + // Serve all configured services. Non blocking. + Serve() error + + // Close all active services. + Stop() +} + +type container struct { + log logrus.FieldLogger + mu sync.Mutex + services []*entry +} + +// NewContainer creates new service container. +func NewContainer(log logrus.FieldLogger) Container { + return &container{ + log: log, + services: make([]*entry, 0), + } +} + +// Register add new service to the container under given name. +func (c *container) Register(name string, service Service) { + c.mu.Lock() + defer c.mu.Unlock() + + c.services = append(c.services, &entry{ + name: name, + svc: service, + status: StatusRegistered, + }) + + c.log.Debugf("[%s]: registered", name) +} + +// Check hasStatus svc has been registered. +func (c *container) Has(target string) bool { + c.mu.Lock() + defer c.mu.Unlock() + + for _, e := range c.services { + if e.name == target { + return true + } + } + + return false +} + +// Get returns svc instance by it's name or nil if svc not found. +func (c *container) Get(target string) (svc Service, status int) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, e := range c.services { + if e.name == target { + return e.svc, e.getStatus() + } + } + + return nil, StatusUndefined +} + +// Init configures all underlying services with given configuration. +func (c *container) Init(cfg Config) error { + for _, e := range c.services { + if e.getStatus() >= StatusConfigured { + return fmt.Errorf("service [%s] has already been configured", e.name) + } + + segment := cfg.Get(e.name) + if segment == nil { + c.log.Debugf("[%s]: no config has been provided", e.name) + continue + } + + ok, err := e.svc.Init(segment, c) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) + } else if ok { + e.setStatus(StatusConfigured) + } + } + + return nil +} + +// Serve all configured services. Non blocking. +func (c *container) Serve() error { + var ( + numServing int + done = make(chan interface{}, len(c.services)) + ) + defer close(done) + + for _, e := range c.services { + if e.hasStatus(StatusConfigured) { + numServing ++ + } else { + continue + } + + c.log.Debugf("[%s]: started", e.name) + go func(e *entry) { + e.setStatus(StatusServing) + defer e.setStatus(StatusStopped) + + if err := e.svc.Serve(); err != nil { + c.log.Errorf("[%s]: %s", e.name, err) + done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) + } + }(e) + } + + for i := 0; i < numServing; i++ { + result := <-done + + // found an error in one of the services, stopping the rest of running services. + if err, ok := result.(error); ok { + c.Stop() + return err + } + } + + return nil +} + +// Stop sends stop command to all running services. +func (c *container) Stop() { + for _, e := range c.services { + if e.hasStatus(StatusServing) { + e.svc.Stop() + e.setStatus(StatusStopped) + c.log.Debugf("[%s]: stopped", e.name) + } + } +} diff --git a/service/container_test.go b/service/container_test.go new file mode 100644 index 00000000..3c2f8761 --- /dev/null +++ b/service/container_test.go @@ -0,0 +1,327 @@ +package service + +import ( + "testing" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/sirupsen/logrus" + "encoding/json" + "errors" + "time" + "sync" +) + +type testService struct { + mu sync.Mutex + waitForServe chan interface{} + delay time.Duration + ok bool + cfg Config + c Container + cfgE, serveE error + done chan interface{} +} + +func (t *testService) Init(cfg Config, c Container) (enabled bool, err error) { + t.cfg = cfg + t.c = c + t.done = make(chan interface{}) + return t.ok, t.cfgE +} + +func (t *testService) Serve() error { + time.Sleep(t.delay) + + if t.serveE != nil { + return t.serveE + } + + if c := t.waitChan(); c != nil { + close(c) + t.setChan(nil) + } + + <-t.done + return nil +} + +func (t *testService) Stop() { + close(t.done) +} + +func (t *testService) waitChan() chan interface{} { + t.mu.Lock() + defer t.mu.Unlock() + + return t.waitForServe +} + +func (t *testService) setChan(c chan interface{}) { + t.mu.Lock() + defer t.mu.Unlock() + + t.waitForServe = c +} + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) Config { + vars := make(map[string]string) + json.Unmarshal([]byte(cfg.cfg), &vars) + + v, ok := vars[name] + if !ok { + return nil + } + + return &testCfg{cfg: v} +} +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func TestContainer_Register(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) +} + +func TestContainer_Has(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) + + assert.True(t, c.Has("test")) + assert.False(t, c.Has("another")) +} + +func TestContainer_Get(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) + + s, st = c.Get("another") + assert.Nil(t, s) + assert.Equal(t, StatusUndefined, st) +} + +func TestContainer_Stop_NotStarted(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + assert.Equal(t, 1, len(hook.Entries)) + + c.Stop() +} + +func TestContainer_Configure(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusConfigured, st) +} + +func TestContainer_ConfigureNull(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Init(&testCfg{`{"another":"something"}`})) + assert.Equal(t, 2, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureDisabled(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: false} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) + assert.Equal(t, 1, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureError(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: false, + cfgE: errors.New("configure error"), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + err := c.Init(&testCfg{`{"test":"something"}`}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "configure error") + assert.Contains(t, err.Error(), "test") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureTwice(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) + assert.Error(t, c.Init(&testCfg{`{"test":"something"}`})) +} + +func TestContainer_ServeEmptyContainer(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Serve()) + c.Stop() +} + +func TestContainer_Serve(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + waitForServe: make(chan interface{}), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) + + go func() { + assert.NoError(t, c.Serve()) + }() + + <-svc.waitChan() + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusServing, st) + + c.Stop() + + s, st = c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} + +func TestContainer_ServeError(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + waitForServe: make(chan interface{}), + serveE: errors.New("serve error"), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) + + err := c.Serve() + assert.Error(t, err) + assert.Contains(t, err.Error(), "serve error") + assert.Contains(t, err.Error(), "test") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} + +func TestContainer_ServeErrorMultiple(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + delay: time.Millisecond * 10, + waitForServe: make(chan interface{}), + serveE: errors.New("serve error"), + } + + svc2 := &testService{ + ok: true, + waitForServe: make(chan interface{}), + } + + c := NewContainer(logger) + c.Register("test2", svc2) + c.Register("test", svc) + assert.Equal(t, 2, len(hook.Entries)) + assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) + + err := c.Serve() + assert.Error(t, err) + assert.Contains(t, err.Error(), "serve error") + assert.Contains(t, err.Error(), "test") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) + + s, st = c.Get("test2") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} diff --git a/service/http/config.go b/service/http/config.go new file mode 100644 index 00000000..2bc5f845 --- /dev/null +++ b/service/http/config.go @@ -0,0 +1,42 @@ +package http + +import ( + "github.com/spiral/roadrunner" + "errors" + "strings" +) + +// Configures RoadRunner HTTP server. +type Config struct { + // Enable enables http svc. + Enable bool + + // Address and port to handle as http server. + Address string + + // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited. + MaxRequest int64 + + // Uploads configures uploads configuration. + Uploads *UploadsConfig + + // Workers configures roadrunner server and worker pool. + Workers *roadrunner.ServerConfig +} + +// Valid validates the configuration. +func (cfg *Config) Valid() error { + if cfg.Uploads == nil { + return errors.New("mailformed uploads config") + } + + if cfg.Workers == nil { + return errors.New("mailformed workers config") + } + + if !strings.Contains(cfg.Address, ":") { + return errors.New("mailformed server address") + } + + return nil +} diff --git a/service/http/config_test.go b/service/http/config_test.go new file mode 100644 index 00000000..b806b79b --- /dev/null +++ b/service/http/config_test.go @@ -0,0 +1,88 @@ +package http + +import ( + "testing" + "os" + "github.com/stretchr/testify/assert" + "github.com/spiral/roadrunner" + "time" +) + +func Test_Config_Valid(t *testing.T) { + cfg := &Config{ + Enable: true, + Address: ":8080", + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.NoError(t, cfg.Valid()) +} + +func Test_Config_NoUploads(t *testing.T) { + cfg := &Config{ + Enable: true, + Address: ":8080", + MaxRequest: 1024, + Workers: &roadrunner.ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_NoWorkers(t *testing.T) { + cfg := &Config{ + Enable: true, + Address: ":8080", + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_InvalidAddress(t *testing.T) { + cfg := &Config{ + Enable: true, + Address: "", + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php php-src/tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} diff --git a/service/http/handler.go b/service/http/handler.go new file mode 100644 index 00000000..a4cb6406 --- /dev/null +++ b/service/http/handler.go @@ -0,0 +1,121 @@ +package http + +import ( + "net/http" + "strconv" + "github.com/spiral/roadrunner" + "github.com/pkg/errors" + "sync" +) + +const ( + // EventResponse thrown after the request been processed. See Event as payload. + EventResponse = iota + 500 + + // EventError thrown on any non job error provided by road runner server. + EventError +) + +// Event represents singular http response event. +type Event struct { + // Method of the request. + Method string + + // Uri requested by the client. + Uri string + + // Status is response status. + Status int + + // Associated error, if any. + Error error +} + +// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Handler struct { + cfg *Config + rr *roadrunner.Server + mul sync.Mutex + lsn func(event int, ctx interface{}) +} + +// AddListener attaches pool event watcher. +func (h *Handler) Listen(l func(event int, ctx interface{})) { + h.mul.Lock() + defer h.mul.Unlock() + + h.lsn = l +} + +// middleware serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // validating request size + if h.cfg.MaxRequest != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + h.handleError(w, r, err) + return + } else if size > h.cfg.MaxRequest*1024*1024 { + h.handleError(w, r, errors.New("request body max size is exceeded")) + return + } + } + } + + req, err := NewRequest(r, h.cfg.Uploads) + if err != nil { + h.handleError(w, r, err) + return + } + + if err = req.Open(); err != nil { + h.handleError(w, r, err) + return + } + defer req.Close() + + p, err := req.Payload() + if err != nil { + h.handleError(w, r, err) + return + } + + rsp, err := h.rr.Exec(p) + if err != nil { + h.handleError(w, r, err) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + h.handleError(w, r, err) + return + } + + h.handleResponse(req, resp) + resp.Write(w) +} + +// handleError sends error. +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) { + h.throw(EventError, &Event{Method: r.Method, Uri: uri(r), Status: 500, Error: err}) + + w.WriteHeader(500) + w.Write([]byte(err.Error())) +} + +// handleResponse triggers response event. +func (h *Handler) handleResponse(req *Request, resp *Response) { + h.throw(EventResponse, &Event{Method: req.Method, Uri: req.Uri, Status: resp.Status}) +} + +// throw invokes event srv if any. +func (h *Handler) throw(event int, ctx interface{}) { + h.mul.Lock() + defer h.mul.Unlock() + + if h.lsn != nil { + h.lsn(event, ctx) + } +} diff --git a/service/http/handler_test.go b/service/http/handler_test.go new file mode 100644 index 00000000..d599b1d8 --- /dev/null +++ b/service/http/handler_test.go @@ -0,0 +1,821 @@ +package http + +import ( + "net/http" + "io/ioutil" + "github.com/spiral/roadrunner" + "testing" + "os" + "github.com/stretchr/testify/assert" + "net/url" + "strings" + "context" + "bytes" + "mime/multipart" + "time" + "runtime" +) + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} + +func TestServer_Echo(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8077", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + body, r, err := get("http://localhost:8077/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func TestServer_Headers(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php header pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8078", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) + assert.NoError(t, err) + + req.Header.Add("input", "sample") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "world", r.Header.Get("Header")) + assert.Equal(t, "SAMPLE", string(b)) +} + +func TestServer_Cookies(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php cookie pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8079", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("GET", "http://localhost:8079", nil) + assert.NoError(t, err) + + req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"}) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "INPUT-VALUE", string(b)) + + for _, c := range r.Cookies() { + assert.Equal(t, "output", c.Name) + assert.Equal(t, "cookie-output", c.Value) + } +} + +func TestServer_JsonPayload_POST(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8090", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest( + "POST", + "http://localhost"+hs.Addr, + bytes.NewBufferString(`{"key":"value"}`), + ) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestServer_JsonPayload_PUT(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8081", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestServer_JsonPayload_PATCH(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8082", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestServer_FormData_POST(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8083", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_FormData_PUT(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8084", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_FormData_PATCH(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8085", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_Multipart_POST(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8019", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + w.WriteField("key", "value") + + w.WriteField("key", "value") + w.WriteField("name[]", "name1") + w.WriteField("name[]", "name2") + w.WriteField("name[]", "name3") + w.WriteField("arr[x][y][z]", "y") + w.WriteField("arr[x][y][e]", "f") + w.WriteField("arr[c]p", "l") + w.WriteField("arr[c]z", "") + + w.Close() + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_Multipart_PUT(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8020", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + w.WriteField("key", "value") + + w.WriteField("key", "value") + w.WriteField("name[]", "name1") + w.WriteField("name[]", "name2") + w.WriteField("name[]", "name3") + w.WriteField("arr[x][y][z]", "y") + w.WriteField("arr[x][y][e]", "f") + w.WriteField("arr[c]p", "l") + w.WriteField("arr[c]z", "") + + w.Close() + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_Multipart_PATCH(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + w.WriteField("key", "value") + + w.WriteField("key", "value") + w.WriteField("name[]", "name1") + w.WriteField("name[]", "name2") + w.WriteField("name[]", "name3") + w.WriteField("arr[x][y][z]", "y") + w.WriteField("arr[x][y][e]", "f") + w.WriteField("arr[c]p", "l") + w.WriteField("arr[c]z", "") + + w.Close() + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestServer_Error(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php error pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8077", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + _, r, err := get("http://localhost:8077/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func TestServer_Error2(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php error2 pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8077", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + _, r, err := get("http://localhost:8077/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func TestServer_Error3(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php pid pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8077", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + b2 := &bytes.Buffer{} + for i := 0; i < 1024*1024; i++ { + b2.Write([]byte(" ")) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func BenchmarkHandler_Listen_Echo(b *testing.B) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + st.rr.Start() + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8077", Handler: st} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + bb := "WORLD" + for n := 0; n < b.N; n++ { + r, err := http.Get("http://localhost:8077/?hello=world") + if err != nil { + b.Fail() + } + defer r.Body.Close() + + br, _ := ioutil.ReadAll(r.Body) + if string(br) != bb { + b.Fail() + } + } +} diff --git a/service/http/parse.go b/service/http/parse.go new file mode 100644 index 00000000..1f90930a --- /dev/null +++ b/service/http/parse.go @@ -0,0 +1,147 @@ +package http + +import ( + "net/http" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) dataTree { + data := make(dataTree) + if r.PostForm != nil { + for k, v := range r.PostForm { + data.push(k, v) + } + } + + if r.MultipartForm != nil { + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + } + + return data +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + return + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including contentMultipart form dataTree) +func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + return + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// fetchIndexes parses input name and splits it into separate indexes list. +func fetchIndexes(s string) []string { + var ( + pos int + ch string + keys = make([]string, 1) + ) + + for _, c := range s { + ch = string(c) + switch ch { + case " ": + // ignore all spaces + continue + case "[": + pos = 1 + continue + case "]": + if pos == 1 { + keys = append(keys, "") + } + pos = 2 + default: + if pos == 1 || pos == 2 { + keys = append(keys, "") + } + + keys[len(keys)-1] += ch + pos = 0 + } + } + + return keys +} diff --git a/service/http/parse_test.go b/service/http/parse_test.go new file mode 100644 index 00000000..34c0dc0d --- /dev/null +++ b/service/http/parse_test.go @@ -0,0 +1,54 @@ +package http + +import "testing" + +var samples = []struct { + in string + out []string +}{ + {"key", []string{"key"}}, + {"key[subkey]", []string{"key", "subkey"}}, + {"key[subkey]value", []string{"key", "subkey", "value"}}, + {"key[subkey][value]", []string{"key", "subkey", "value"}}, + {"key[subkey][value][]", []string{"key", "subkey", "value", ""}}, + {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}}, + {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}}, +} + +func Test_FetchIndexes(t *testing.T) { + for _, tt := range samples { + t.Run(tt.in, func(t *testing.T) { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + t.Errorf("got %q, want %q", r, tt.out) + } + }) + } +} + +func BenchmarkConfig_FetchIndexes(b *testing.B) { + for _, tt := range samples { + for n := 0; n < b.N; n++ { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + b.Fail() + } + } + } +} + +func same(in, out []string) bool { + if len(in) != len(out) { + return false + } + + for i, v := range in { + if v != out[i] { + return false + } + } + + return true +} + +// bench diff --git a/service/http/request.go b/service/http/request.go new file mode 100644 index 00000000..e02d3cdb --- /dev/null +++ b/service/http/request.go @@ -0,0 +1,155 @@ +package http + +import ( + "encoding/json" + "fmt" + "github.com/spiral/roadrunner" + "io/ioutil" + "net/http" + "strings" +) + +const ( + defaultMaxMemory = 32 << 20 // 32 MB + contentNone = iota + 900 + contentStream + contentMultipart + contentFormData +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // Uri contains full request Uri with scheme and query. + Uri string `json:"uri"` + + // Headers contains list of request headers. + Headers http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // request body can be parsedData or []byte + body interface{} +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { + req = &Request{ + Protocol: r.Proto, + Method: r.Method, + Uri: uri(r), + Headers: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + } + + for _, c := range r.Cookies() { + req.Cookies[c.Name] = c.Value + } + + switch req.contentType() { + case contentNone: + return req, nil + + case contentStream: + req.body, err = ioutil.ReadAll(r.Body) + return req, err + + case contentMultipart: + if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + req.Uploads = parseUploads(r, cfg) + fallthrough + case contentFormData: + if err = r.ParseForm(); err != nil { + return nil, err + } + + req.body = parseData(r) + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open() error { + if r.Uploads == nil { + return nil + } + + return r.Uploads.Open() +} + +// Close clears all temp file uploads +func (r *Request) Close() { + if r.Uploads == nil { + return + } + + r.Uploads.Clear() +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (p *roadrunner.Payload, err error) { + p = &roadrunner.Payload{} + + if p.Context, err = json.Marshal(r); err != nil { + return nil, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return nil, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// contentType returns the payload content type. +func (r *Request) contentType() int { + if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" { + return contentNone + } + + ct := r.Headers.Get("content-type") + if ct == "application/x-www-form-urlencoded" { + return contentFormData + } + + if strings.Contains(ct, "multipart/form-data") { + return contentMultipart + } + + return contentStream +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/service/http/response.go b/service/http/response.go new file mode 100644 index 00000000..69bcf3e1 --- /dev/null +++ b/service/http/response.go @@ -0,0 +1,53 @@ +package http + +import ( + "encoding/json" + "github.com/spiral/roadrunner" + "net/http" + "io" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Headers contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated body payload. + body interface{} +} + +// NewResponse creates new response based on given roadrunner payload. +func NewResponse(p *roadrunner.Payload) (*Response, error) { + r := &Response{body: p.Body} + if err := json.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + for k, v := range r.Headers { + for _, h := range v { + w.Header().Add(k, h) + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.body.([]byte); ok { + w.Write(data) + } + + if rc, ok := r.body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} diff --git a/service/http/response_test.go b/service/http/response_test.go new file mode 100644 index 00000000..e45f5349 --- /dev/null +++ b/service/http/response_test.go @@ -0,0 +1,92 @@ +package http + +import ( + "net/http" + "bytes" + "testing" + "github.com/spiral/roadrunner" + "github.com/stretchr/testify/assert" + "errors" +) + +type testWriter struct { + h http.Header + buf bytes.Buffer + wroteHeader bool + code int + err error +} + +func (tw *testWriter) Header() http.Header { return tw.h } + +func (tw *testWriter) Write(p []byte) (int, error) { + if !tw.wroteHeader { + tw.WriteHeader(http.StatusOK) + } + + n, e := tw.buf.Write(p) + if e == nil { + e = tw.err + } + + return n, e +} + +func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code } + +func TestNewResponse_Error(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)}) + assert.Error(t, err) + assert.Nil(t, r) +} + +func TestNewResponse_Write(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + Body: []byte(`sample body`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "sample body", w.buf.String()) +} + +func TestNewResponse_Stream(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "hello world", w.buf.String()) +} + +func TestNewResponse_StreamError(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")} + assert.Error(t, r.Write(w)) +} diff --git a/service/http/rpc.go b/service/http/rpc.go new file mode 100644 index 00000000..aebc5903 --- /dev/null +++ b/service/http/rpc.go @@ -0,0 +1,57 @@ +package http + +import ( + "github.com/pkg/errors" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []Worker `json:"workers"` +} + +// Worker provides information about specific worker. +type Worker struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumJobs int64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + *r = "OK" + return rpc.svc.rr.Reset() +} + +// Workers returns list of active workers and their stats. +func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + for _, w := range rpc.svc.rr.Workers() { + state := w.State() + r.Workers = append(r.Workers, Worker{ + Pid: *w.Pid, + Status: state.String(), + NumJobs: state.NumExecs(), + Created: w.Created.UnixNano(), + }) + } + + return nil +} diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go new file mode 100644 index 00000000..f1c5786c --- /dev/null +++ b/service/http/rpc_test.go @@ -0,0 +1,115 @@ +package http + +import ( + "testing" + "github.com/sirupsen/logrus/hooks/test" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "time" + "github.com/spiral/roadrunner/service/rpc" + "strconv" +) + +func Test_RPC(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + res, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) + + cl, err := rs.Client() + assert.NoError(t, err) + + r := "" + assert.NoError(t, cl.Call("http.Reset", true, &r)) + assert.Equal(t, "OK", r) + + res2, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) + assert.NotEqual(t, res, res2) +} + +func Test_Workers(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + cl, err := rs.Client() + assert.NoError(t, err) + + r := &WorkerList{} + assert.NoError(t, cl.Call("http.Workers", true, &r)) + assert.Len(t, r.Workers, 1) + + assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid) +}
\ No newline at end of file diff --git a/service/http/service.go b/service/http/service.go new file mode 100644 index 00000000..8485cba6 --- /dev/null +++ b/service/http/service.go @@ -0,0 +1,128 @@ +package http + +import ( + "net/http" + "github.com/spiral/roadrunner/service" + "context" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/rpc" + "sync" +) + +// ID contains default svc name. +const ID = "http" + +// must return true if request/response pair is handled withing the middleware. +type middleware func(w http.ResponseWriter, r *http.Request) bool + +// Service manages rr, http servers. +type Service struct { + cfg *Config + lsns []func(event int, ctx interface{}) + mdws []middleware + + mu sync.Mutex + rr *roadrunner.Server + srv *Handler + http *http.Server +} + +func (s *Service) AddMiddleware(m middleware) { + s.mdws = append(s.mdws, m) +} + +// AddListener attaches server event watcher. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.lsns = append(s.lsns, l) +} + +// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Init(cfg service.Config, c service.Container) (bool, error) { + config := &Config{} + + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + + // registering http RPC interface + if r, ok := c.Get(rpc.ID); ok >= service.StatusConfigured { + if h, ok := r.(*rpc.Service); ok { + h.Register(ID, &rpcServer{s}) + } + } + + return true, nil +} + +// Serve serves the svc. +func (s *Service) Serve() error { + s.mu.Lock() + rr := roadrunner.NewServer(s.cfg.Workers) + + s.rr = rr + s.srv = &Handler{cfg: s.cfg, rr: s.rr} + s.http = &http.Server{Addr: s.cfg.Address} + + s.rr.Listen(s.listener) + s.srv.Listen(s.listener) + + if len(s.mdws) == 0 { + s.http.Handler = s.srv + } else { + s.http.Handler = s + } + s.mu.Unlock() + + if err := rr.Start(); err != nil { + return err + } + defer s.rr.Stop() + + return s.http.ListenAndServe() +} + +// Stop stops the svc. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.http == nil { + return + } + + s.http.Shutdown(context.Background()) +} + +// middleware handles connection using set of mdws and rr PSR-7 server. +func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + for _, m := range s.mdws { + if m(w, r) { + return + } + } + + s.srv.ServeHTTP(w, r) +} + +func (s *Service) listener(event int, ctx interface{}) { + for _, l := range s.lsns { + l(event, ctx) + } + + if event == roadrunner.EventServerFailure { + // attempting rr server restart + if err := s.rr.Start(); err != nil { + s.Stop() + } + } +} diff --git a/service/http/service_test.go b/service/http/service_test.go new file mode 100644 index 00000000..ebcd0f5a --- /dev/null +++ b/service/http/service_test.go @@ -0,0 +1,400 @@ +package http + +import ( + "testing" + "github.com/spiral/roadrunner/service" + "github.com/sirupsen/logrus/hooks/test" + "github.com/sirupsen/logrus" + "encoding/json" + "github.com/stretchr/testify/assert" + "os" + "time" + "net/http" + "io/ioutil" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/rpc" +) + +type testCfg struct { + httpCfg string + rpcCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.httpCfg} + } + + if name == rpc.ID { + return &testCfg{target: cfg.rpcCfg} + } + return nil +} +func (cfg *testCfg) Unmarshal(out interface{}) error { + return json.Unmarshal([]byte(cfg.target), out) +} + +func Test_Service_NoConfig(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{}`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusRegistered, st) +} + +func Test_Service_Configure_Disable(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": false, + "address": ":8070", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusRegistered, st) +} + +func Test_Service_Configure_Enable(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":8070", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) +} + +func Test_Service_Echo(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + // should do nothing + s.Stop() + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) +} + +func Test_Service_Middleware(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + s.(*Service).AddMiddleware(func(w http.ResponseWriter, r *http.Request) bool { + if r.URL.Path == "/halt" { + w.WriteHeader(500) + w.Write([]byte("halted")) + return true + } + + return false + }) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + req, err = http.NewRequest("GET", "http://localhost:6029/halt", nil) + assert.NoError(t, err) + + r, err = http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err = ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) + assert.Equal(t, "halted", string(b)) +} + +func Test_Service_Listener(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + stop := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventServerStart { + stop <- nil + } + }) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + + c.Stop() + assert.True(t, true) +} + +func Test_Service_Error(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "---", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + assert.Error(t, c.Serve()) +} + +func Test_Service_Error2(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + assert.Error(t, c.Serve()) +} + +func Test_Service_Error3(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.Error(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers" + "command": "php ../../php-src/tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) +} + +func Test_Service_Error4(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.Error(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": "----", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) +} + +func tmpDir() string { + p := os.TempDir() + r, _ := json.Marshal(p) + + return string(r) +} diff --git a/service/http/uploads.go b/service/http/uploads.go new file mode 100644 index 00000000..f8334c30 --- /dev/null +++ b/service/http/uploads.go @@ -0,0 +1,140 @@ +package http + +import ( + "encoding/json" + "os" + "sync" + "mime/multipart" + "io/ioutil" + "io" +) + +const ( + // There is no error, the file uploaded with success. + UploadErrorOK = 0 + + // No file was uploaded. + UploadErrorNoFile = 4 + + // Missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // Failed to write file to disk. + UploadErrorCantWrite = 6 + + // Forbid file extension. + UploadErrorExtension = 7 +) + +// tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg *UploadsConfig + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + return json.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open() error { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + f.Open(u.cfg) + }(f) + } + + wg.Wait() + return nil +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear() { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + os.Remove(f.TempFilename) + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // ID contains filename specified by the client. + Name string `json:"name"` + + // Mime contains mime-type provided by the client. + Mime string `json:"mime"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + Mime: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +func (f *FileUpload) Open(cfg *UploadsConfig) error { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + defer file.Close() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer tmp.Close() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +// exists if file exists. +func exists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + + return false +} diff --git a/service/http/uploads_config.go b/service/http/uploads_config.go new file mode 100644 index 00000000..715de69a --- /dev/null +++ b/service/http/uploads_config.go @@ -0,0 +1,39 @@ +package http + +import ( + "strings" + "path" + "os" +) + +// UploadsConfig describes file location and controls access to them. +type UploadsConfig struct { + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// TmpDir returns temporary directory. +func (cfg *UploadsConfig) TmpDir() string { + if cfg.Dir != "" { + return cfg.Dir + } + + return os.TempDir() +} + +// Forbid must return true if file extension is not allowed for the upload. +func (cfg *UploadsConfig) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/service/http/uploads_config_test.go b/service/http/uploads_config_test.go new file mode 100644 index 00000000..7704a486 --- /dev/null +++ b/service/http/uploads_config_test.go @@ -0,0 +1,24 @@ +package http + +import ( + "testing" + "github.com/stretchr/testify/assert" + "os" +) + +func TestFsConfig_Forbids(t *testing.T) { + cfg := UploadsConfig{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestFsConfig_TmpFallback(t *testing.T) { + cfg := UploadsConfig{Dir: "test"} + assert.Equal(t, "test", cfg.TmpDir()) + + cfg = UploadsConfig{Dir: ""} + assert.Equal(t, os.TempDir(), cfg.TmpDir()) +} diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go new file mode 100644 index 00000000..af351067 --- /dev/null +++ b/service/http/uploads_test.go @@ -0,0 +1,311 @@ +package http + +import ( + "github.com/spiral/roadrunner" + "github.com/stretchr/testify/assert" + "net/http" + "time" + "bytes" + "mime/multipart" + "io/ioutil" + "testing" + "os" + "context" + "io" + "encoding/json" + "crypto/md5" + "encoding/hex" +) + +func TestServer_Upload_File(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer f.Close() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + io.Copy(fw, f) + + w.Close() + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 0, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + +func TestServer_Upload_NestedFile(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer f.Close() + fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + io.Copy(fw, f) + + w.Close() + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 0, "application/octet-stream") + + assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) +} + + +func TestServer_Upload_File_NoTmpDir(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: "-----", + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer f.Close() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + io.Copy(fw, f) + + w.Close() + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 5, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + + +func TestServer_Upload_File_Forbids(t *testing.T) { + st := &Handler{ + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../php-src/tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, st.rr.Start()) + defer st.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: st,} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer f.Close() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + io.Copy(fw, f) + + w.Close() + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 7, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + +func Test_FileExists(t *testing.T) { + assert.True(t, exists("uploads_test.go")) + assert.False(t, exists("uploads_test.")) +} + +func mustOpen(f string) *os.File { + r, err := os.Open(f) + if err != nil { + panic(err) + } + return r +} + +type fInfo struct { + Name string `json:"name"` + Size int64 `json:"size"` + Mime string `json:"mime"` + Error int `json:"error"` + MD5 string `json:"md5,omitempty"` +} + +func fileString(f string, err int, mime string) string { + s, _ := os.Stat(f) + + ff, _ := os.Open(f) + defer ff.Close() + h := md5.New() + io.Copy(h, ff) + + v := &fInfo{ + Name: s.Name(), + Size: s.Size(), + Error: err, + Mime: mime, + MD5: hex.EncodeToString(h.Sum(nil)), + } + + if err != 0 { + v.MD5 = "" + v.Size = 0 + } + + r, _ := json.Marshal(v) + return string(r) + +} diff --git a/service/rpc/config.go b/service/rpc/config.go new file mode 100644 index 00000000..06d63d65 --- /dev/null +++ b/service/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // AddListener string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go new file mode 100644 index 00000000..a953e30e --- /dev/null +++ b/service/rpc/config_test.go @@ -0,0 +1,109 @@ +package rpc + +import ( + "github.com/stretchr/testify/assert" + "runtime" + "testing" +) + +func TestConfig_Listener(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "tcp", ln.Addr().Network()) + assert.Equal(t, "[::]:18001", ln.Addr().String()) +} + +func TestConfig_ListenerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer ln.Close() + + assert.Equal(t, "unix", ln.Addr().Network()) + assert.Equal(t, "rpc.sock", ln.Addr().String()) +} + +func Test_Config_Error(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_ErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.listener() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func TestConfig_Dialer(t *testing.T) { + cfg := &config{Listen: "tcp://:18001"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "tcp", conn.RemoteAddr().Network()) + assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) +} + +func TestConfig_DialerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "unix://rpc.sock"} + + ln, err := cfg.listener() + defer ln.Close() + + conn, err := cfg.dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer conn.Close() + + assert.Equal(t, "unix", conn.RemoteAddr().Network()) + assert.Equal(t, "rpc.sock", conn.RemoteAddr().String()) +} + +func Test_Config_DialerError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + cfg := &config{Listen: "uni:unix.sock"} + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) +} + +func Test_Config_DialerErrorMethod(t *testing.T) { + cfg := &config{Listen: "xinu://unix.sock"} + + ln, err := cfg.dialer() + assert.Nil(t, ln) + assert.Error(t, err) +} diff --git a/service/rpc/service.go b/service/rpc/service.go new file mode 100644 index 00000000..e1147754 --- /dev/null +++ b/service/rpc/service.go @@ -0,0 +1,122 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" + "sync" +) + +// ID contains default service name. +const ID = "rpc" + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server + + mu sync.Mutex + serving bool +} + +// Init must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + s.cfg = config + s.rpc = rpc.NewServer() + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.mu.Lock() + s.serving = true + s.stop = make(chan interface{}) + s.mu.Unlock() + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + go func() { + for { + select { + case <-s.stop: + break + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + <-s.stop + + s.mu.Lock() + s.serving = false + s.mu.Unlock() + + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.serving { + close(s.stop) + } +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go new file mode 100644 index 00000000..ce85d52f --- /dev/null +++ b/service/rpc/service_test.go @@ -0,0 +1,97 @@ +package rpc + +import ( + "encoding/json" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +type testService struct{} + +func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) service.Config { return nil } +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_ConfigError(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":false`}, nil) + + assert.Error(t, err) + assert.False(t, ok) +} + +func Test_Disabled(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":false}`}, nil) + + assert.NoError(t, err) + assert.False(t, ok) +} + +func Test_RegisterNotConfigured(t *testing.T) { + s := &Service{} + assert.Error(t, s.Register("test", &testService{})) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) + assert.Error(t, s.Serve()) +} + +func Test_Enabled(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) +} + +func Test_StopNonServing(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + + assert.NoError(t, err) + assert.True(t, ok) + s.Stop() +} + +func Test_Serve_Errors(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + assert.Error(t, s.Serve()) + + client, err := s.Client() + assert.Nil(t, client) + assert.Error(t, err) +} + +func Test_Serve_Client(t *testing.T) { + s := &Service{} + ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil) + assert.NoError(t, err) + assert.True(t, ok) + + defer s.Stop() + + assert.NoError(t, s.Register("test", &testService{})) + + go func() { assert.NoError(t, s.Serve()) }() + + time.Sleep(time.Millisecond) + client, err := s.Client() + assert.NotNil(t, client) + assert.NoError(t, err) + defer client.Close() + + var resp string + assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) + assert.Equal(t, "hello world", resp) +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 00000000..6ddcda41 --- /dev/null +++ b/service/service.go @@ -0,0 +1,61 @@ +package service + +import "sync" + +// svc provides high level functionality for road runner svc. +type Service interface { + // Init must return configure service and return true if service hasStatus enabled. Must return error in case of + // misconfiguration. Services must not be used without proper configuration pushed first. + Init(cfg Config, c Container) (enabled bool, err error) + + // Serve serves. + Serve() error + + // Stop stops the service. + Stop() +} + +const ( + // StatusUndefined when service bus can not find the service. + StatusUndefined = iota + + // StatusRegistered hasStatus setStatus when service has been registered in container. + StatusRegistered + + // StatusConfigured hasStatus setStatus when service has been properly configured. + StatusConfigured + + // StatusServing hasStatus setStatus when service hasStatus currently done. + StatusServing + + // StatusStopped hasStatus setStatus when service hasStatus stopped. + StatusStopped +) + +// entry creates association between service instance and given name. +type entry struct { + name string + svc Service + mu sync.Mutex + status int +} + +// status returns service status +func (e *entry) getStatus() int { + e.mu.Lock() + defer e.mu.Unlock() + + return e.status +} + +// setStarted indicates that service hasStatus status. +func (e *entry) setStatus(status int) { + e.mu.Lock() + defer e.mu.Unlock() + e.status = status +} + +// hasStatus checks if entry in specific status +func (e *entry) hasStatus(status int) bool { + return e.getStatus() == status +} diff --git a/service/static/config.go b/service/static/config.go new file mode 100644 index 00000000..2a1f6c13 --- /dev/null +++ b/service/static/config.go @@ -0,0 +1,52 @@ +package static + +import ( + "strings" + "path" + "os" + "github.com/pkg/errors" +) + +// Config describes file location and controls access to them. +type Config struct { + // Enables StaticFile service. + Enable bool + + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// Forbid must return true if file extension is not allowed for the upload. +func (cfg *Config) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} + +// Valid validates existence of directory. +func (cfg *Config) Valid() error { + st, err := os.Stat(cfg.Dir) + if err != nil { + if os.IsNotExist(err) { + return errors.New("root directory does not exists") + } + + return err + } + + if !st.IsDir() { + return errors.New("invalid root directory") + } + + return nil +} diff --git a/service/static/config_test.go b/service/static/config_test.go new file mode 100644 index 00000000..ce31348a --- /dev/null +++ b/service/static/config_test.go @@ -0,0 +1,21 @@ +package static + +import ( + "testing" + "github.com/stretchr/testify/assert" +) + +func TestConfig_Forbids(t *testing.T) { + cfg := Config{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestConfig_Valid(t *testing.T) { + assert.NoError(t, (&Config{Dir: "./"}).Valid()) + assert.Error(t, (&Config{Dir: "./config.go"}).Valid()) + assert.Error(t, (&Config{Dir: "./dir/"}).Valid()) +} diff --git a/service/static/service.go b/service/static/service.go new file mode 100644 index 00000000..43891aa8 --- /dev/null +++ b/service/static/service.go @@ -0,0 +1,89 @@ +package static + +import ( + "net/http" + "path" + "strings" + rrttp "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/service" +) + +// ID contains default service name. +const ID = "static" + +// Service serves static files. Potentially convert into middleware? +type Service struct { + // server configuration (location, forbidden files and etc) + cfg *Config + + // root is initiated http directory + root http.Dir +} + +// Init must return configure service and return true if service hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Init(cfg service.Config, c service.Container) (enabled bool, err error) { + config := &Config{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enable { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + s.root = http.Dir(s.cfg.Dir) + + // registering as middleware + if h, ok := c.Get(rrttp.ID); ok >= service.StatusConfigured { + if h, ok := h.(*rrttp.Service); ok { + h.AddMiddleware(s.middleware) + } + } + + return true, nil +} + +// Serve serves the service. +func (s *Service) Serve() error { return nil } + +// Stop stops the service. +func (s *Service) Stop() {} + +// middleware must return true if request/response pair is handled withing the middleware. +func (s *Service) middleware(w http.ResponseWriter, r *http.Request) bool { + fPath := r.URL.Path + + if !strings.HasPrefix(fPath, "/") { + fPath = "/" + fPath + } + fPath = path.Clean(fPath) + + if s.cfg.Forbids(fPath) { + return false + } + + f, err := s.root.Open(fPath) + if err != nil { + return false + } + defer f.Close() + + d, err := f.Stat() + if err != nil { + return false + } + + // do not serve directories + if d.IsDir() { + return false + } + + http.ServeContent(w, r, d.Name(), d.ModTime(), f) + return true +} diff --git a/service/static/service_test.go b/service/static/service_test.go new file mode 100644 index 00000000..f0cb1bea --- /dev/null +++ b/service/static/service_test.go @@ -0,0 +1,349 @@ +package static + +import ( + "testing" + "github.com/sirupsen/logrus/hooks/test" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "time" + rrhttp "github.com/spiral/roadrunner/service/http" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "bytes" + "io" +) + +type testCfg struct { + httpCfg string + static string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == rrhttp.ID { + return &testCfg{target: cfg.httpCfg} + } + + if name == ID { + return &testCfg{target: cfg.static} + } + return nil +} +func (cfg *testCfg) Unmarshal(out interface{}) error { + return json.Unmarshal([]byte(cfg.target), out) +} + +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} + +func Test_Files(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/sample.txt") + assert.Equal(t, "sample", b) +} + + +func Test_Files_Disable(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":false, "dir":"../../php-src/tests", "forbid":[".php"]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/client.php?hello=world") + assert.Equal(t, "WORLD", b) +} + +func Test_Files_Error(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.Error(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"dir/invalid", "forbid":[".php"]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) +} + +func Test_Files_Error2(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.Error(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"dir/invalid", "forbid":[".php"]`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) +} + +func Test_Files_Forbid(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/client.php?hello=world") + assert.Equal(t, "WORLD", b) +} + +func Test_Files_NotFound(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/client.XXX?hello=world") + assert.Equal(t, "WORLD", b) +} + + +func Test_Files_Dir(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/http?hello=world") + assert.Equal(t, "WORLD", b) +} + +func Test_Files_NotForbid(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[]}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + b, _, _ := get("http://localhost:6029/client.php") + assert.Equal(t, all("../../php-src/tests/client.php"), b) +} + +func tmpDir() string { + p := os.TempDir() + r, _ := json.Marshal(p) + + return string(r) +} + +func all(fn string) string { + f, _ := os.Open(fn) + defer f.Close() + + b := &bytes.Buffer{} + io.Copy(b, f) + + return b.String() +} diff --git a/socket_factory.go b/socket_factory.go index acdc91b1..43059e8a 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -25,7 +25,7 @@ type SocketFactory struct { relays map[int]chan *goridge.SocketRelay } -// NewSocketFactory returns SocketFactory attached to a given socket listener. +// NewSocketFactory returns SocketFactory attached to a given socket lsn. // tout specifies for how long factory should serve for incoming relay connection func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { f := &SocketFactory{ @@ -54,7 +54,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { go func(w *Worker) { w.Kill() }(w) if wErr := w.Wait(); wErr != nil { - err = errors.Wrap(wErr, err.Error()) + if _, ok := wErr.(*exec.ExitError); ok { + err = errors.Wrap(wErr, err.Error()) + } else { + err = wErr + } } return nil, errors.Wrap(err, "unable to connect to worker") @@ -66,6 +70,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { return w, nil } +// Close socket factory and underlying socket connection. +func (f *SocketFactory) Close() error { + return f.ls.Close() +} + // listens for incoming socket connections func (f *SocketFactory) listen() { for { diff --git a/socket_factory_test.go b/socket_factory_test.go index d0d643af..214a4851 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -32,6 +32,31 @@ func Test_Tcp_Start(t *testing.T) { w.Stop() } +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") + + f := NewSocketFactory(ls, time.Minute) + defer f.Close() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + w.Stop() +} + func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket @@ -42,7 +67,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") cmd.Start() w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) @@ -60,7 +85,7 @@ func Test_Tcp_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/failboot.php") + cmd := exec.Command("php", "php-src/tests/failboot.php") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.Nil(t, w) @@ -78,7 +103,7 @@ func Test_Tcp_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0") + cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "tcp", "200", "0") w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd) assert.Nil(t, w) @@ -96,7 +121,7 @@ func Test_Tcp_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/invalid.php") + cmd := exec.Command("php", "php-src/tests/invalid.php") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.Error(t, err) @@ -113,7 +138,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "php-src/tests/client.php", "broken", "tcp") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -140,7 +165,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -159,7 +184,7 @@ func Test_Tcp_Echo(t *testing.T) { } func Test_Unix_Start(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -170,7 +195,7 @@ func Test_Unix_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -184,7 +209,7 @@ func Test_Unix_Start(t *testing.T) { } func Test_Unix_Failboot(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -195,7 +220,7 @@ func Test_Unix_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/failboot.php") + cmd := exec.Command("php", "php-src/tests/failboot.php") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.Nil(t, w) @@ -204,7 +229,7 @@ func Test_Unix_Failboot(t *testing.T) { } func Test_Unix_Timeout(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -215,7 +240,7 @@ func Test_Unix_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd) assert.Nil(t, w) @@ -224,7 +249,7 @@ func Test_Unix_Timeout(t *testing.T) { } func Test_Unix_Invalid(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -235,7 +260,7 @@ func Test_Unix_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/invalid.php") + cmd := exec.Command("php", "php-src/tests/invalid.php") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.Error(t, err) @@ -243,7 +268,7 @@ func Test_Unix_Invalid(t *testing.T) { } func Test_Unix_Broken(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -254,7 +279,7 @@ func Test_Unix_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "broken", "unix") + cmd := exec.Command("php", "php-src/tests/client.php", "broken", "unix") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -272,7 +297,7 @@ func Test_Unix_Broken(t *testing.T) { } func Test_Unix_Echo(t *testing.T) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { t.Skip("not supported on " + runtime.GOOS) } @@ -283,7 +308,7 @@ func Test_Unix_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix") w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -311,7 +336,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { f := NewSocketFactory(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") w, _ := f.SpawnWorker(cmd) go func() { @@ -332,7 +357,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -348,7 +373,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { b.Skip("not supported on " + runtime.GOOS) } @@ -361,7 +386,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { f := NewSocketFactory(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix") w, _ := f.SpawnWorker(cmd) go func() { @@ -375,7 +400,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { - if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + if runtime.GOOS == "windows" { b.Skip("not supported on " + runtime.GOOS) } @@ -386,7 +411,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix") w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -2,9 +2,7 @@ package roadrunner import ( "fmt" - "sync" "sync/atomic" - "time" ) // State represents worker status and updated time. @@ -14,46 +12,53 @@ type State interface { // Value returns state value Value() int64 - // NumExecs shows how many times worker was invoked - NumExecs() uint64 - - // Updated indicates a moment updated last state change - Updated() time.Time + // NumJobs shows how many times worker was invoked + NumExecs() int64 } const ( // StateInactive - no associated process StateInactive int64 = iota + // StateReady - ready for job. StateReady + // StateWorking - working on given payload. StateWorking - // StateStopped - process has been terminated + + // StateStreaming - indicates that worker is streaming the data at the moment. + StateStreaming + + // StateStopping - process is being softly stopped. + StateStopping + + // StateStopped - process has been terminated. StateStopped - // StateErrored - error state (can't be used) + + // StateErrored - error state (can't be used). StateErrored ) type state struct { - mu sync.RWMutex value int64 - numExecs uint64 - updated time.Time + numExecs int64 } func newState(value int64) *state { - return &state{value: value, updated: time.Now()} + return &state{value: value} } // String returns current state as string. func (s *state) String() string { - switch s.value { + switch s.Value() { case StateInactive: return "inactive" case StateReady: return "ready" case StateWorking: return "working" + case StateStreaming: + return "streaming" case StateStopped: return "stopped" case StateErrored: @@ -63,12 +68,14 @@ func (s *state) String() string { return "undefined" } +// NumExecs returns number of registered worker execs. +func (s *state) NumExecs() int64 { + return atomic.LoadInt64(&s.numExecs) +} + // Value state returns state value func (s *state) Value() int64 { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.value + return atomic.LoadInt64(&s.value) } // IsActive returns true if worker not Inactive or Stopped @@ -77,28 +84,12 @@ func (s *state) IsActive() bool { return state == StateWorking || state == StateReady } -// Updated indicates a moment updated last state change -func (s *state) Updated() time.Time { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.updated -} - -func (s *state) NumExecs() uint64 { - return atomic.LoadUint64(&s.numExecs) -} - // change state value (status) func (s *state) set(value int64) { - s.mu.Lock() - defer s.mu.Unlock() - - s.value = value - s.updated = time.Now() + atomic.StoreInt64(&s.value, value) } // register new execution atomically func (s *state) registerExec() { - atomic.AddUint64(&s.numExecs, 1) + atomic.AddInt64(&s.numExecs, 1) } diff --git a/state_test.go b/state_test.go index be63230e..c13c5a88 100644 --- a/state_test.go +++ b/state_test.go @@ -9,7 +9,6 @@ func Test_NewState(t *testing.T) { st := newState(StateErrored) assert.Equal(t, "errored", st.String()) - assert.NotEqual(t, 0, st.Updated().Unix()) assert.Equal(t, "inactive", newState(StateInactive).String()) assert.Equal(t, "ready", newState(StateReady).String()) diff --git a/static_pool.go b/static_pool.go index c4895bf0..9449ea0c 100644 --- a/static_pool.go +++ b/static_pool.go @@ -6,6 +6,7 @@ import ( "os/exec" "sync" "time" + "sync/atomic" ) const ( @@ -21,9 +22,6 @@ type StaticPool struct { // worker command creator cmd func() *exec.Cmd - // observer is optional callback to handle worker create/destruct/error events. - observer func(event int, w *Worker, ctx interface{}) - // creates and connects to workers factory Factory @@ -33,17 +31,27 @@ type StaticPool struct { // workers circular allocation buffer free chan *Worker + // number of workers expected to be dead in a buffer. + numDead int64 + // protects state of worker list, does not affect allocation muw sync.RWMutex // all registered workers workers []*Worker + + // pool is being destroying + inDestroy int32 + + // lsn is optional callback to handle worker create/destruct/error events. + mul sync.Mutex + lsn func(event int, ctx interface{}) } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config error") + return nil, errors.Wrap(err, "config") } p := &StaticPool{ @@ -55,10 +63,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er } // constant number of workers simplify logic - for i := uint64(0); i < p.cfg.NumWorkers; i++ { + for i := int64(0); i < p.cfg.NumWorkers; i++ { // to test if worker ready w, err := p.createWorker() - if err != nil { p.Destroy() return nil, err @@ -70,9 +77,12 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Report attaches pool event watcher. -func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) { - p.observer = o +// AddListener attaches pool event watcher. +func (p *StaticPool) Listen(l func(event int, ctx interface{})) { + p.mul.Lock() + defer p.mul.Unlock() + + p.lsn = l } // Config returns associated pool configuration. Immutable. @@ -107,40 +117,38 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { if err != nil { // soft job errors are allowed if _, jobError := err.(JobError); jobError { - p.free <- w + p.release(w) return nil, err } - go p.replaceWorker(w, err) + go p.destroyWorker(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.replaceWorker(w, err) + go p.destroyWorker(w, err) return p.Exec(rqs) } - if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { - go p.replaceWorker(w, p.cfg.MaxExecutions) - } else { - p.free <- w - } - + p.release(w) return rsp, nil } // Destroy all underlying workers (but let them to complete the task). func (p *StaticPool) Destroy() { + atomic.AddInt32(&p.inDestroy, 1) + defer atomic.AddInt32(&p.inDestroy, -1) + p.tasks.Wait() var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) + go w.Stop() go func(w *Worker) { defer wg.Done() - - p.destroyWorker(w) + p.destroyWorker(w, nil) }(w) } @@ -149,93 +157,134 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { - select { - case w = <-p.free: - return w, nil - default: - // enable timeout handler - } + for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ { + // this loop is required to skip issues with dead workers still being in a ring. + select { + case w = <-p.free: + if w.State().Value() != StateReady { + atomic.AddInt64(&p.numDead, ^int64(0)) + continue + } + + return w, nil + default: + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w = <-p.free: + timeout.Stop() + + if w.State().Value() != StateReady { + atomic.AddInt64(&p.numDead, ^int64(0)) + continue + } + return w, nil + } } + + return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers) } -// replaceWorker replaces dead or expired worker with new instance. -func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { - go p.destroyWorker(w) +// release releases or replaces the worker. +func (p *StaticPool) release(w *Worker) { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + go p.destroyWorker(w, p.cfg.MaxJobs) + return + } - if nw, err := p.createWorker(); err != nil { - p.throw(EventError, w, err) + p.free <- w +} - if len(p.Workers()) == 0 { - // possible situation when major error causes all PHP scripts to die (for example dead DB) - p.throw(EventError, nil, fmt.Errorf("all workers dead")) - } - } else { - p.free <- nw +// creates new worker using associated factory. automatically +// adds worker to the worker list (background) +func (p *StaticPool) createWorker() (*Worker, error) { + w, err := p.factory.SpawnWorker(p.cmd()) + if err != nil { + return nil, err } -} -// destroyWorker destroys workers and removes it from the pool. -func (p *StaticPool) destroyWorker(w *Worker) { - p.throw(EventDestruct, w, nil) + p.throw(EventWorkerConstruct, w) - // detaching p.muw.Lock() - for i, wc := range p.workers { - if wc == w { - p.workers = p.workers[:i+1] - break - } - } + p.workers = append(p.workers, w) p.muw.Unlock() + go p.watchWorker(w) + return w, nil +} + +// destroyWorker destroys workers and removes it from the pool. +func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { go w.Stop() select { case <-w.waitDone: // worker is dead + p.throw(EventWorkerDestruct, w) + case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process + // failed to stop process in given time if err := w.Kill(); err != nil { - p.throw(EventError, w, err) + p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) } + + p.throw(EventWorkerKill, w) } } -// creates new worker using associated factory. automatically -// adds worker to the worker list (background) -func (p *StaticPool) createWorker() (*Worker, error) { - w, err := p.factory.SpawnWorker(p.cmd()) - if err != nil { - return nil, err +// watchWorker watches worker state and replaces it if worker fails. +func (p *StaticPool) watchWorker(w *Worker) { + err := w.Wait() + p.throw(EventWorkerDead, w) + + // detaching + p.muw.Lock() + for i, wc := range p.workers { + if wc == w { + p.workers = append(p.workers[:i], p.workers[i+1:]...) + break + } } + p.muw.Unlock() - p.throw(EventCreated, w, nil) + // registering a dead worker + atomic.AddInt64(&p.numDead, 1) - go func(w *Worker) { - if err := w.Wait(); err != nil { - p.throw(EventError, w, err) - } - }(w) + // worker have died unexpectedly, pool should attempt to replace it with alive version safely + if err != nil { + p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } - p.muw.Lock() - defer p.muw.Unlock() + if !p.destroying() { + nw, err := p.createWorker() + if err == nil { + p.free <- nw + return + } - p.workers = append(p.workers, w) + // possible situation when major error causes all PHP scripts to die (for example dead DB) + if len(p.Workers()) == 0 { + p.throw(EventPoolError, err) + } else { + p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } + } +} - return w, nil +func (p *StaticPool) destroying() bool { + return atomic.LoadInt32(&p.inDestroy) != 0 } // throw invokes event handler if any. -func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) { - if p.observer != nil { - p.observer(event, w, ctx) +func (p *StaticPool) throw(event int, ctx interface{}) { + p.mul.Lock() + defer p.mul.Unlock() + + if p.lsn != nil { + p.lsn(event, ctx) } } diff --git a/static_pool_test.go b/static_pool_test.go index b4069b98..231bc71b 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,24 +2,24 @@ package roadrunner import ( "github.com/stretchr/testify/assert" - "log" "os/exec" "runtime" "strconv" - "sync" "testing" "time" + "sync" + "log" ) var cfg = Config{ - NumWorkers: uint64(runtime.NumCPU()), + NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, } func Test_NewPool(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -33,7 +33,7 @@ func Test_NewPool(t *testing.T) { func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/invalid.php") }, NewPipeFactory(), cfg, ) @@ -44,7 +44,7 @@ func Test_StaticPool_Invalid(t *testing.T) { func Test_ConfigError(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), Config{ AllocateTimeout: time.Second, @@ -58,7 +58,7 @@ func Test_ConfigError(t *testing.T) { func Test_StaticPool_Echo(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -79,7 +79,7 @@ func Test_StaticPool_Echo(t *testing.T) { func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -100,7 +100,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "head", "pipes") }, NewPipeFactory(), cfg, ) @@ -121,7 +121,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { func Test_StaticPool_JobError(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "error", "pipes") }, NewPipeFactory(), cfg, ) @@ -141,7 +141,7 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "broken", "pipes") }, NewPipeFactory(), cfg, ) @@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Report(func(e int, w *Worker, ctx interface{}) { + p.Listen(func(e int, ctx interface{}) { if err, ok := ctx.(error); ok { assert.Contains(t, err.Error(), "undefined_function()") } @@ -162,9 +162,46 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Nil(t, res) } +func Test_StaticPool_Broken_FromOutside(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Equal(t, runtime.NumCPU(), len(p.Workers())) + + destructed := make(chan interface{}) + p.Listen(func(e int, ctx interface{}) { + if e == EventWorkerConstruct { + destructed <- nil + } + }) + + // killing random worker and expecting pool to replace it + p.workers[0].cmd.Process.Kill() + <-destructed + + for _, w := range p.Workers() { + assert.Equal(t, StateReady, w.state.Value()) + } +} + func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") }, NewPipeFactory(), Config{ NumWorkers: 1, @@ -196,11 +233,11 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") }, NewPipeFactory(), Config{ NumWorkers: 1, - MaxExecutions: 1, + MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -229,10 +266,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { } } + // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "stop", "pipes") }, NewPipeFactory(), Config{ NumWorkers: 1, @@ -266,7 +304,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { func Benchmark_Pool_Allocate(b *testing.B) { p, _ := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -285,7 +323,7 @@ func Benchmark_Pool_Allocate(b *testing.B) { func Benchmark_Pool_Echo(b *testing.B) { p, _ := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -300,7 +338,7 @@ func Benchmark_Pool_Echo(b *testing.B) { func Benchmark_Pool_Echo_Batched(b *testing.B) { p, _ := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) @@ -323,11 +361,11 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, _ := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), Config{ NumWorkers: 1, - MaxExecutions: 1, + MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "time" ) // Worker - supervised process with api over goridge.Relay. @@ -19,6 +20,9 @@ type Worker struct { // can be nil while process is not started. Pid *int + // Created indicates at what time worker has been created. + Created time.Time + // state holds information about current worker state, // number of worker executions, last status change time. // publicly this object is receive-only and protected using Mutex @@ -26,13 +30,13 @@ type Worker struct { state *state // underlying command with associated process, command must be - // provided to worker from outside in non-started form. Cmd + // provided to worker from outside in non-started form. CmdSource // stdErr direction will be handled by worker to aggregate error message. cmd *exec.Cmd // err aggregates stderr output from underlying process. Value can be // receive only once command is completed and all pipes are closed. - err *bytes.Buffer + err *errBuffer // channel is being closed once command is complete. waitDone chan interface{} @@ -54,13 +58,14 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) { } w := &Worker{ + Created: time.Now(), cmd: cmd, - err: new(bytes.Buffer), + err: &errBuffer{buffer: new(bytes.Buffer)}, waitDone: make(chan interface{}), state: newState(StateInactive), } - // piping all stderr to command buffer + // piping all stderr to command errBuffer w.cmd.Stderr = w.err return w, nil @@ -105,9 +110,16 @@ func (w *Worker) Wait() error { } if w.endState.Success() { + w.state.set(StateStopped) return nil } + if w.state.Value() != StateStopping { + w.state.set(StateErrored) + } else { + w.state.set(StateStopped) + } + if w.err.Len() != 0 { return errors.New(w.err.String()) } @@ -125,7 +137,7 @@ func (w *Worker) Stop() error { w.mu.Lock() defer w.mu.Unlock() - w.state.set(StateInactive) + w.state.set(StateStopping) err := sendPayload(w.rl, &stopCommand{Stop: true}) <-w.waitDone @@ -134,16 +146,13 @@ func (w *Worker) Stop() error { } // Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Waits for process completion. +// error log from the stderr. Does not waits for process completion! func (w *Worker) Kill() error { select { case <-w.waitDone: return nil default: - w.mu.Lock() - defer w.mu.Unlock() - - w.state.set(StateInactive) + w.state.set(StateStopping) err := w.cmd.Process.Signal(os.Kill) <-w.waitDone @@ -163,14 +172,13 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { } if w.state.Value() != StateReady { - return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value()) + return nil, fmt.Errorf("worker is not ready (%s)", w.state.String()) } w.state.set(StateWorking) defer w.state.registerExec() rsp, err = w.execPayload(rqs) - if err != nil { if _, ok := err.(JobError); !ok { w.state.set(StateErrored) @@ -178,6 +186,9 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { } } + // todo: attach when payload is complete + // todo: new status + w.state.set(StateReady) return rsp, err } @@ -194,12 +205,11 @@ func (w *Worker) start() error { go func() { w.endState, _ = w.cmd.Process.Wait() if w.waitDone != nil { - w.state.set(StateStopped) close(w.waitDone) + w.mu.Lock() + defer w.mu.Unlock() if w.rl != nil { - w.mu.Lock() - defer w.mu.Unlock() w.rl.Close() } } @@ -230,6 +240,7 @@ func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { return nil, JobError(rsp.Context) } + // add streaming support :) if rsp.Body, pr, err = w.rl.Receive(); err != nil { return nil, errors.Wrap(err, "worker error") } diff --git a/worker_test.go b/worker_test.go index de11226a..2e3bc111 100644 --- a/worker_test.go +++ b/worker_test.go @@ -4,15 +4,15 @@ import ( "github.com/stretchr/testify/assert" "os/exec" "testing" - "time" ) func Test_GetState(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) + assert.Equal(t, StateStopped, w.State().Value()) }() assert.NoError(t, err) @@ -20,11 +20,26 @@ func Test_GetState(t *testing.T) { assert.Equal(t, StateReady, w.State().Value()) w.Stop() - assert.Equal(t, StateStopped, w.State().Value()) +} + +func Test_Kill(t *testing.T) { + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.Error(t, w.Wait()) + assert.Equal(t, StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, StateReady, w.State().Value()) + w.Kill() } func Test_Echo(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -42,17 +57,47 @@ func Test_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_BadPayload(t *testing.T) { + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() + + res, err := w.Exec(nil) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Equal(t, "payload can not be empty", err.Error()) +} + func Test_NotStarted_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := newWorker(cmd) - assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "php php-src/tests/client.php echo pipes") assert.Contains(t, w.String(), "inactive") assert.Contains(t, w.String(), "numExecs: 0") } +func Test_NotStarted_Exec(t *testing.T) { + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") + + w, _ := newWorker(cmd) + + res, err := w.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Equal(t, "worker is not ready (inactive)", err.Error()) +} + func Test_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -60,13 +105,13 @@ func Test_String(t *testing.T) { }() defer w.Stop() - assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "php php-src/tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") assert.Contains(t, w.String(), "numExecs: 0") } func Test_Echo_Slow(t *testing.T) { - cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10") + cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -85,7 +130,7 @@ func Test_Echo_Slow(t *testing.T) { } func Test_Broken(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -101,7 +146,7 @@ func Test_Broken(t *testing.T) { } func Test_OnStarted(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes") assert.Nil(t, cmd.Start()) w, err := newWorker(cmd) @@ -112,7 +157,7 @@ func Test_OnStarted(t *testing.T) { } func Test_Error(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "error", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "error", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -129,7 +174,7 @@ func Test_Error(t *testing.T) { } func Test_NumExecs(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -138,27 +183,11 @@ func Test_NumExecs(t *testing.T) { defer w.Stop() w.Exec(&Payload{Body: []byte("hello")}) - assert.Equal(t, uint64(1), w.State().NumExecs()) - - w.Exec(&Payload{Body: []byte("hello")}) - assert.Equal(t, uint64(2), w.State().NumExecs()) + assert.Equal(t, int64(1), w.State().NumExecs()) w.Exec(&Payload{Body: []byte("hello")}) - assert.Equal(t, uint64(3), w.State().NumExecs()) -} - -func Test_StateUpdated(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer w.Stop() - - tm := time.Now() - time.Sleep(time.Millisecond) + assert.Equal(t, int64(2), w.State().NumExecs()) w.Exec(&Payload{Body: []byte("hello")}) - assert.True(t, w.State().Updated().After(tm)) -} + assert.Equal(t, int64(3), w.State().NumExecs()) +}
\ No newline at end of file |