summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-13 23:46:13 +0300
committerGitHub <[email protected]>2018-06-13 23:46:13 +0300
commitc3ccb29fe412baa8c4b02a1630f3a4a040ab722c (patch)
tree7a61db95d8e4d02ac5740d593ed708358f34949a
parent50f820833eeef8518b3b978b33c6f20391225162 (diff)
parent8ab8c64413ded038e3c8816647209c3b961b3a35 (diff)
Merge pull request #9 from spiral/develop
HTTP
-rw-r--r--.travis.yml31
-rw-r--r--CHANGELOG.md9
-rw-r--r--LICENSE2
-rw-r--r--Makefile6
-rw-r--r--README.md2
-rw-r--r--cmd/rr/.rr.yaml56
-rw-r--r--cmd/rr/LICENSE21
-rw-r--r--cmd/rr/cmd/root.go124
-rw-r--r--cmd/rr/cmd/serve.go49
-rw-r--r--cmd/rr/debug/listener.go93
-rw-r--r--cmd/rr/http/reset.go61
-rw-r--r--cmd/rr/http/workers.go167
-rw-r--r--cmd/rr/main.go64
-rw-r--r--cmd/rr/utils/cprint.go28
-rw-r--r--composer.json11
-rw-r--r--config.go18
-rw-r--r--config_test.go6
-rw-r--r--error_buffer.go39
-rw-r--r--error_buffer_test.go14
-rw-r--r--errors.go24
-rw-r--r--errors_test.go (renamed from job_error_test.go)6
-rw-r--r--factory.go3
-rw-r--r--job_error.go10
-rw-r--r--payload.go4
-rw-r--r--php-src/Exceptions/RoadRunnerException.php (renamed from source/Exceptions/RoadRunnerException.php)0
-rw-r--r--php-src/PSR7Client.php131
-rw-r--r--php-src/Worker.php (renamed from source/Worker.php)3
-rw-r--r--php-src/tests/broken.php (renamed from tests/broken.php)0
-rw-r--r--php-src/tests/client.php (renamed from tests/client.php)2
-rw-r--r--php-src/tests/delay.php (renamed from tests/delay.php)0
-rw-r--r--php-src/tests/echo.php (renamed from tests/echo.php)0
-rw-r--r--php-src/tests/error.php (renamed from tests/error.php)0
-rw-r--r--php-src/tests/failboot.php (renamed from tests/failboot.php)0
-rw-r--r--php-src/tests/head.php (renamed from tests/head.php)0
-rw-r--r--php-src/tests/http/client.php45
-rw-r--r--php-src/tests/http/cookie.php334
-rw-r--r--php-src/tests/http/data.php18
-rw-r--r--php-src/tests/http/echo.php10
-rw-r--r--php-src/tests/http/error.php9
-rw-r--r--php-src/tests/http/error2.php9
-rw-r--r--php-src/tests/http/header.php11
-rw-r--r--php-src/tests/http/payload.php13
-rw-r--r--php-src/tests/http/pid.php11
-rw-r--r--php-src/tests/http/upload.php35
-rw-r--r--php-src/tests/pid.php (renamed from tests/pid.php)0
-rw-r--r--php-src/tests/sample.txt1
-rw-r--r--php-src/tests/slow-client.php (renamed from tests/slow-client.php)2
-rw-r--r--php-src/tests/stop.php (renamed from tests/stop.php)0
-rw-r--r--pipe_factory.go12
-rw-r--r--pipe_factory_test.go20
-rw-r--r--pool.go25
-rw-r--r--server.go207
-rw-r--r--server_config.go60
-rw-r--r--server_config_test.go92
-rw-r--r--server_test.go229
-rw-r--r--service/container.go171
-rw-r--r--service/container_test.go327
-rw-r--r--service/http/config.go42
-rw-r--r--service/http/config_test.go88
-rw-r--r--service/http/handler.go121
-rw-r--r--service/http/handler_test.go821
-rw-r--r--service/http/parse.go147
-rw-r--r--service/http/parse_test.go54
-rw-r--r--service/http/request.go155
-rw-r--r--service/http/response.go53
-rw-r--r--service/http/response_test.go92
-rw-r--r--service/http/rpc.go57
-rw-r--r--service/http/rpc_test.go115
-rw-r--r--service/http/service.go128
-rw-r--r--service/http/service_test.go400
-rw-r--r--service/http/uploads.go140
-rw-r--r--service/http/uploads_config.go39
-rw-r--r--service/http/uploads_config_test.go24
-rw-r--r--service/http/uploads_test.go311
-rw-r--r--service/rpc/config.go35
-rw-r--r--service/rpc/config_test.go109
-rw-r--r--service/rpc/service.go122
-rw-r--r--service/rpc/service_test.go97
-rw-r--r--service/service.go61
-rw-r--r--service/static/config.go52
-rw-r--r--service/static/config_test.go21
-rw-r--r--service/static/service.go89
-rw-r--r--service/static/service_test.go349
-rw-r--r--socket_factory.go13
-rw-r--r--socket_factory_test.go75
-rw-r--r--state.go63
-rw-r--r--state_test.go1
-rw-r--r--static_pool.go201
-rw-r--r--static_pool_test.go80
-rw-r--r--worker.go41
-rw-r--r--worker_test.go95
91 files changed, 6644 insertions, 272 deletions
diff --git a/.travis.yml b/.travis.yml
index 5ea8b20d..4f7741f6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,26 +1,37 @@
-language: php
+language: go
sudo: required
-php:
- - 7.0
- - 7.1
- - 7.2
-
go:
- - 1.8
+ - "1.10.x"
before_install:
+ - go version
+ - sudo add-apt-repository -y ppa:ondrej/php
+ - sudo apt-get update
+ - sudo apt-get install -y php7.0-cli
+ - sudo cp `which php7.0` `which php`
+ - php -v
- composer self-update
install:
- - composer install --no-interaction --prefer-source
+ - go get "github.com/spiral/roadrunner"
- go get -u "github.com/spiral/goridge"
+ - go get -u "github.com/sirupsen/logrus"
- go get -u "github.com/pkg/errors"
- go get -u "github.com/stretchr/testify/assert"
+ - composer install --no-interaction --prefer-source --ignore-platform-reqs
script:
- - go test -race -v -coverprofile=coverage.txt -covermode=atomic
+ - go test -race -v -coverprofile=lib.txt -covermode=atomic
+ - go test ./service -race -v -coverprofile=service.txt -covermode=atomic
+ - go test ./service/rpc -race -v -coverprofile=rpc.txt -covermode=atomic
+ - go test ./service/http -race -v -coverprofile=http.txt -covermode=atomic
+ - go test ./service/static -race -v -coverprofile=static.txt -covermode=atomic
after_success:
- - bash <(curl -s https://codecov.io/bash)
+ - bash <(curl -s https://codecov.io/bash) -f lib.txt
+ - bash <(curl -s https://codecov.io/bash) -f service.txt
+ - bash <(curl -s https://codecov.io/bash) -f rpc.txt
+ - bash <(curl -s https://codecov.io/bash) -f http.txt
+ - bash <(curl -s https://codecov.io/bash) -f static.txt \ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 00000000..e5e66db7
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,9 @@
+CHANGELOG
+=========
+
+v1.0.0 (???? date)
+------
+- worker.State.Updated() has been removed in order to improve overall performance
+- staticPool can automatically replace workers killed from outside
+- server would not attempt to rebuild static pool in case of reoccurring failure
+- MORE STUFF \ No newline at end of file
diff --git a/LICENSE b/LICENSE
index d78565f0..efb98c87 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
MIT License
-Copyright (c) 2017 SpiralScout
+Copyright (c) 2018 SpiralScout
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
diff --git a/Makefile b/Makefile
new file mode 100644
index 00000000..9726ed69
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,6 @@
+test:
+ go test -v -race -cover
+ go test -v -race -cover ./service
+ go test -v -race -cover ./service/rpc
+ go test -v -race -cover ./service/http
+ go test -v -race -cover ./service/static \ No newline at end of file
diff --git a/README.md b/README.md
index e0d31e95..4bdb99a7 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ while ($body = $rr->receive($context)) {
}
}
```
-> Check how to init relay [here](./tests/client.php). More examples can be found in tests.
+> Check how to init relay [here](./php-src/tests/client.php). More examples can be found in tests.
License:
--------
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
new file mode 100644
index 00000000..ab0f3e7f
--- /dev/null
+++ b/cmd/rr/.rr.yaml
@@ -0,0 +1,56 @@
+# rpc bus allows php application and external clients to talk to rr services.
+rpc:
+ # enable rpc server
+ enable: true
+
+ # rpc connection DSN. Supported TCP and Unix sockets.
+ listen: tcp://127.0.0.1:6001
+
+# http service configuration.
+http:
+ # set to false to disable http server.
+ enable: true
+
+ # http host to listen.
+ address: 0.0.0.0:8080
+
+ # max POST request size, including file uploads in MB.
+ maxRequest: 200
+
+ # file upload configuration.
+ uploads:
+ # list of file extensions which are forbidden for uploading.
+ forbid: [".php", ".exe", ".bat"]
+
+ # http worker pool configuration.
+ workers:
+ # php worker command.
+ command: "php psr-worker.php pipes"
+
+ # connection method (pipes, tcp://:9000, unix://socket.unix).
+ relay: "pipes"
+
+ # worker pool configuration.
+ pool:
+ # number of workers to be serving.
+ numWorkers: 4
+
+ # maximum jobs per worker, 0 - unlimited.
+ maxJobs: 0
+
+ # for how long worker is allowed to be bootstrapped. In nanoseconds :(
+ allocateTimeout: 600000000
+
+ # amount of time given to worker to gracefully destruct itself. In nanoseconds :(
+ destroyTimeout: 600000000
+
+# static file serving.
+static:
+ # serve http static files
+ enable: false
+
+ # root directory for static file (http would not serve .php and .htaccess files).
+ dir: "public"
+
+ # list of extensions for forbid for serving.
+ forbid: [".php", ".htaccess"] \ No newline at end of file
diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE
new file mode 100644
index 00000000..efb98c87
--- /dev/null
+++ b/cmd/rr/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 SpiralScout
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
new file mode 100644
index 00000000..bea42747
--- /dev/null
+++ b/cmd/rr/cmd/root.go
@@ -0,0 +1,124 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package cmd
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/rr/utils"
+ "os"
+)
+
+// Service bus for all the commands.
+var (
+ cfgFile string
+ verbose bool
+
+ // Logger - shared logger.
+ Logger = logrus.New()
+
+ // Container - shared service bus.
+ Container = service.NewContainer(Logger)
+
+ // CLI is application endpoint.
+ CLI = &cobra.Command{
+ Use: "rr",
+ SilenceErrors: true,
+ SilenceUsage: true,
+ Short: utils.Sprintf("<green>RoadRunner, PHP Application Server.</reset>"),
+ }
+)
+
+// ViperWrapper provides interface bridge between v configs and service.Config.
+type ViperWrapper struct {
+ v *viper.Viper
+}
+
+// Get nested config section (sub-map), returns nil if section not found.
+func (w *ViperWrapper) Get(key string) service.Config {
+ sub := w.v.Sub(key)
+ if sub == nil {
+ return nil
+ }
+
+ return &ViperWrapper{sub}
+}
+
+// Unmarshal unmarshal config data into given struct.
+func (w *ViperWrapper) Unmarshal(out interface{}) error {
+ return w.v.Unmarshal(out)
+}
+
+// Execute adds all child commands to the CLI command and sets flags appropriately.
+// This is called by main.main(). It only needs to happen once to the CLI.
+func Execute() {
+ if err := CLI.Execute(); err != nil {
+ utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
+ os.Exit(1)
+ }
+}
+
+func init() {
+ CLI.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output")
+ CLI.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)")
+
+ cobra.OnInitialize(func() {
+ if verbose {
+ Logger.SetLevel(logrus.DebugLevel)
+ }
+
+ if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
+ if err := Container.Init(cfg); err != nil {
+ utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
+ os.Exit(1)
+ }
+ }
+ })
+}
+
+func initConfig(cfgFile string, path []string, name string) service.Config {
+ cfg := viper.New()
+
+ if cfgFile != "" {
+ // Use cfg file from the flag.
+ cfg.SetConfigFile(cfgFile)
+ } else {
+ // automatic location
+ for _, p := range path {
+ cfg.AddConfigPath(p)
+ }
+
+ cfg.SetConfigName(name)
+ }
+
+ // read in environment variables that match
+ cfg.AutomaticEnv()
+
+ // If a cfg file is found, read it in.
+ if err := cfg.ReadInConfig(); err != nil {
+ Logger.Warnf("config: %s", err)
+ return nil
+ }
+
+ return &ViperWrapper{cfg}
+}
diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go
new file mode 100644
index 00000000..c53f7ce9
--- /dev/null
+++ b/cmd/rr/cmd/serve.go
@@ -0,0 +1,49 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package cmd
+
+import (
+ "github.com/spf13/cobra"
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+var stopSignal = make(chan os.Signal, 1)
+
+func init() {
+ CLI.AddCommand(&cobra.Command{
+ Use: "serve",
+ Short: "Serve RoadRunner service(s)",
+ RunE: serveHandler,
+ })
+
+ signal.Notify(stopSignal, syscall.SIGTERM)
+ signal.Notify(stopSignal, syscall.SIGINT)
+}
+
+func serveHandler(cmd *cobra.Command, args []string) error {
+ go Container.Serve()
+ <-stopSignal
+ Container.Stop()
+
+ return nil
+}
diff --git a/cmd/rr/debug/listener.go b/cmd/rr/debug/listener.go
new file mode 100644
index 00000000..1b6fcf83
--- /dev/null
+++ b/cmd/rr/debug/listener.go
@@ -0,0 +1,93 @@
+package debug
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner/cmd/rr/utils"
+ "github.com/spiral/roadrunner"
+)
+
+// Listener provide debug callback for system events. With colors!
+type listener struct{ logger *logrus.Logger }
+
+// NewListener creates new debug listener.
+func NewListener(logger *logrus.Logger) *listener {
+ return &listener{logger}
+}
+
+// Listener listens to http events and generates nice looking output.
+func (s *listener) Listener(event int, ctx interface{}) {
+ // http events
+ switch event {
+ case http.EventResponse:
+ log := ctx.(*http.Event)
+ s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ case http.EventError:
+ log := ctx.(*http.Event)
+
+ if _, ok := log.Error.(roadrunner.JobError); ok {
+ s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ } else {
+ s.logger.Info(utils.Sprintf(
+ "%s <white+hb>%s</reset> %s <red>%s</reset>",
+ statusColor(log.Status),
+ log.Method,
+ log.Uri,
+ log.Error,
+ ))
+ }
+ }
+
+ switch event {
+ case roadrunner.EventWorkerKill:
+ w := ctx.(*roadrunner.Worker)
+ s.logger.Warning(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>killed</red>",
+ *w.Pid,
+ ))
+
+ case roadrunner.EventWorkerError:
+ err := ctx.(roadrunner.WorkerError)
+ s.logger.Error(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *err.Worker.Pid,
+ err.Caused,
+ ))
+ }
+
+ // rr server events
+ switch event {
+ case roadrunner.EventServerFailure:
+ s.logger.Error(utils.Sprintf("<red>server is dead</reset>"))
+ }
+
+ // pool events
+ switch event {
+ case roadrunner.EventPoolConstruct:
+ s.logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>"))
+ case roadrunner.EventPoolError:
+ s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
+ }
+}
+
+// Serve serves.
+func (s *listener) Serve() error { return nil }
+
+// Stop stops the Listener.
+func (s *listener) Stop() {}
+
+func statusColor(status int) string {
+ if status < 300 {
+ return utils.Sprintf("<green>%v</reset>", status)
+ }
+
+ if status < 400 {
+ return utils.Sprintf("<cyan>%v</reset>", status)
+ }
+
+ if status < 500 {
+ return utils.Sprintf("<yellow>%v</reset>", status)
+ }
+
+ return utils.Sprintf("<red>%v</reset>", status)
+}
diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go
new file mode 100644
index 00000000..d0a42b64
--- /dev/null
+++ b/cmd/rr/http/reset.go
@@ -0,0 +1,61 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package http
+
+import (
+ "errors"
+ "github.com/spf13/cobra"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/rr/utils"
+)
+
+func init() {
+ rr.CLI.AddCommand(&cobra.Command{
+ Use: "http:reset",
+ Short: "Reload RoadRunner worker pools for the HTTP service",
+ RunE: reloadHandler,
+ })
+}
+
+func reloadHandler(cmd *cobra.Command, args []string) error {
+ svc, st := rr.Container.Get(rpc.ID)
+ if st < service.StatusConfigured {
+ return errors.New("RPC service is not configured")
+ }
+
+ client, err := svc.(*rpc.Service).Client()
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ utils.Printf("<green>restarting http worker pool</reset>: ")
+
+ var r string
+ if err := client.Call("http.Reset", true, &r); err != nil {
+ return err
+ }
+
+ utils.Printf("<green+hb>done</reset>")
+ return nil
+}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
new file mode 100644
index 00000000..728c415e
--- /dev/null
+++ b/cmd/rr/http/workers.go
@@ -0,0 +1,167 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package http
+
+import (
+ "errors"
+ tm "github.com/buger/goterm"
+ "github.com/dustin/go-humanize"
+ "github.com/olekukonko/tablewriter"
+ "github.com/shirou/gopsutil/process"
+ "github.com/spf13/cobra"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service/http"
+ rrpc "github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/rr/utils"
+ "net/rpc"
+ "os"
+ "strconv"
+ "time"
+ "os/signal"
+ "syscall"
+)
+
+var (
+ interactive bool
+ stopSignal = make(chan os.Signal, 1)
+)
+
+func init() {
+ workersCommand := &cobra.Command{
+ Use: "http:workers",
+ Short: "List workers associated with RoadRunner HTTP service",
+ RunE: workersHandler,
+ }
+
+ workersCommand.Flags().BoolVarP(
+ &interactive,
+ "interactive",
+ "i",
+ false,
+ "render interactive workers table",
+ )
+
+ rr.CLI.AddCommand(workersCommand)
+
+ signal.Notify(stopSignal, syscall.SIGTERM)
+ signal.Notify(stopSignal, syscall.SIGINT)
+}
+
+func workersHandler(cmd *cobra.Command, args []string) (err error) {
+ defer func() {
+ if r, ok := recover().(error); ok {
+ err = r
+ }
+ }()
+
+ svc, st := rr.Container.Get(rrpc.ID)
+ if st < service.StatusConfigured {
+ return errors.New("RPC service is not configured")
+ }
+
+ client, err := svc.(*rrpc.Service).Client()
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ if !interactive {
+ showWorkers(client)
+ return nil
+ }
+
+ tm.Clear()
+ for {
+ select {
+ case <-stopSignal:
+ return nil
+ case <-time.NewTicker(time.Millisecond * 500).C:
+ tm.MoveCursor(1, 1)
+ showWorkers(client)
+ tm.Flush()
+ }
+ }
+
+ return nil
+}
+
+func showWorkers(client *rpc.Client) {
+ var r http.WorkerList
+ if err := client.Call("http.Workers", true, &r); err != nil {
+ panic(err)
+ }
+
+ tw := tablewriter.NewWriter(os.Stdout)
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 9)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 7)
+ tw.SetColMinWidth(4, 18)
+
+ for _, w := range r.Workers {
+ tw.Append([]string{
+ strconv.Itoa(w.Pid),
+ renderStatus(w.Status),
+ renderJobs(w.NumJobs),
+ renderMemory(w.Pid),
+ renderAlive(time.Unix(0, w.Created)),
+ })
+ }
+
+ tw.Render()
+}
+
+func renderStatus(status string) string {
+ switch status {
+ case "inactive":
+ return utils.Sprintf("<yellow>inactive</reset>")
+ case "ready":
+ return utils.Sprintf("<cyan>ready</reset>")
+ case "working":
+ return utils.Sprintf("<green>working</reset>")
+ case "stopped":
+ return utils.Sprintf("<red>stopped</reset>")
+ case "errored":
+ return utils.Sprintf("<red>errored</reset>")
+ }
+
+ return status
+}
+
+func renderJobs(number int64) string {
+ return humanize.Comma(int64(number))
+}
+
+func renderAlive(t time.Time) string {
+ return humanize.RelTime(t, time.Now(), "ago", "")
+}
+
+func renderMemory(pid int) string {
+ p, _ := process.NewProcess(int32(pid))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return err.Error()
+ }
+
+ return humanize.Bytes(i.VMS)
+}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
new file mode 100644
index 00000000..ee6b915b
--- /dev/null
+++ b/cmd/rr/main.go
@@ -0,0 +1,64 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package main
+
+import (
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+
+ // services (plugins)
+ "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/service/static"
+
+ // cli plugins
+ _ "github.com/spiral/roadrunner/cmd/rr/http"
+ "github.com/spiral/roadrunner/cmd/rr/debug"
+
+ "github.com/spf13/cobra"
+)
+
+var debugMode bool
+
+func main() {
+
+ // provides ability to make local connection to services
+ rr.Container.Register(rpc.ID, &rpc.Service{})
+
+ // http serving
+ rr.Container.Register(http.ID, &http.Service{})
+
+ // serving static files
+ rr.Container.Register(static.ID, &static.Service{})
+
+ // debug mode
+ rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode", )
+ cobra.OnInitialize(func() {
+ if debugMode {
+ service, _ := rr.Container.Get(http.ID)
+ service.(*http.Service).AddListener(debug.NewListener(rr.Logger).Listener)
+ }
+ })
+
+ // you can register additional commands using cmd.CLI
+ rr.Execute()
+}
diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go
new file mode 100644
index 00000000..e0fb6931
--- /dev/null
+++ b/cmd/rr/utils/cprint.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+ "github.com/mgutz/ansi"
+)
+
+var reg *regexp.Regexp
+
+func init() {
+ reg, _ = regexp.Compile(`<([^>]+)>`)
+}
+
+// Printf works identically to fmt.Print but adds `<white+hb>color formatting support for CLI</reset>`.
+func Printf(format string, args ...interface{}) {
+ fmt.Print(Sprintf(format, args...))
+}
+
+// Sprintf works identically to fmt.Sprintf but adds `<white+hb>color formatting support for CLI</reset>`.
+func Sprintf(format string, args ...interface{}) string {
+ format = reg.ReplaceAllStringFunc(format, func(s string) string {
+ return ansi.ColorCode(strings.Trim(s, "<>/"))
+ })
+
+ return fmt.Sprintf(format, args...)
+} \ No newline at end of file
diff --git a/composer.json b/composer.json
index b72f6de1..5d946f49 100644
--- a/composer.json
+++ b/composer.json
@@ -11,14 +11,13 @@
],
"require": {
"php": "^7.0",
- "spiral/goridge": "^2.0"
- },
- "require-dev": {
- "phpunit/phpunit": "~6.0"
+ "spiral/goridge": "^2.0",
+ "psr/http-message": "^1.0",
+ "zendframework/zend-diactoros": "^1.7"
},
"autoload": {
"psr-4": {
- "Spiral\\RoadRunner\\": "source/"
+ "Spiral\\RoadRunner\\": "php-src/"
}
}
-} \ No newline at end of file
+}
diff --git a/config.go b/config.go
index e48cefc2..02008181 100644
--- a/config.go
+++ b/config.go
@@ -9,34 +9,34 @@ import (
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap.
- NumWorkers uint64
+ NumWorkers int64
- // MaxExecutions defines how many executions is allowed for the worker until
+ // MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxExecutions uint64
+ MaxJobs int64
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task.
- AllocateTimeout time.Duration
+ AllocateTimeout time.Duration //todo: to milleseconds?
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly stop, if timeout reached worker will be killed.
- DestroyTimeout time.Duration
+ DestroyTimeout time.Duration //todo: to milleseconds?
}
-// Valid returns error if config not valid
+// Reconfigure returns error if cfg not valid
func (cfg *Config) Valid() error {
if cfg.NumWorkers == 0 {
- return fmt.Errorf("config.NumWorkers must be set")
+ return fmt.Errorf("pool.NumWorkers must be set")
}
if cfg.AllocateTimeout == 0 {
- return fmt.Errorf("config.AllocateTimeout must be set")
+ return fmt.Errorf("pool.AllocateTimeout must be set")
}
if cfg.DestroyTimeout == 0 {
- return fmt.Errorf("config.DestroyTimeout must be set")
+ return fmt.Errorf("pool.DestroyTimeout must be set")
}
return nil
diff --git a/config_test.go b/config_test.go
index 64bad7cb..89cde8f1 100644
--- a/config_test.go
+++ b/config_test.go
@@ -14,7 +14,7 @@ func Test_NumWorkers(t *testing.T) {
err := cfg.Valid()
assert.NotNil(t, err)
- assert.Equal(t, "config.NumWorkers must be set", err.Error())
+ assert.Equal(t, "pool.NumWorkers must be set", err.Error())
}
func Test_AllocateTimeout(t *testing.T) {
@@ -25,7 +25,7 @@ func Test_AllocateTimeout(t *testing.T) {
err := cfg.Valid()
assert.NotNil(t, err)
- assert.Equal(t, "config.AllocateTimeout must be set", err.Error())
+ assert.Equal(t, "pool.AllocateTimeout must be set", err.Error())
}
func Test_DestroyTimeout(t *testing.T) {
@@ -36,5 +36,5 @@ func Test_DestroyTimeout(t *testing.T) {
err := cfg.Valid()
assert.NotNil(t, err)
- assert.Equal(t, "config.DestroyTimeout must be set", err.Error())
+ assert.Equal(t, "pool.DestroyTimeout must be set", err.Error())
}
diff --git a/error_buffer.go b/error_buffer.go
new file mode 100644
index 00000000..fcf566c8
--- /dev/null
+++ b/error_buffer.go
@@ -0,0 +1,39 @@
+package roadrunner
+
+import (
+ "bytes"
+ "sync"
+)
+
+// thread safe errBuffer
+type errBuffer struct {
+ mu sync.Mutex
+ buffer *bytes.Buffer
+}
+
+// Len returns the number of bytes of the unread portion of the errBuffer;
+// b.Len() == len(b.Bytes()).
+func (b *errBuffer) Len() int {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ return b.buffer.Len()
+}
+
+// Write appends the contents of p to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of p; err is always nil. If the
+// errBuffer becomes too large, Write will panic with ErrTooLarge.
+func (b *errBuffer) Write(p []byte) (n int, err error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ return b.buffer.Write(p)
+}
+
+// Strings fetches all errBuffer data into string.
+func (b *errBuffer) String() string {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ return b.buffer.String()
+}
diff --git a/error_buffer_test.go b/error_buffer_test.go
new file mode 100644
index 00000000..afbc80e2
--- /dev/null
+++ b/error_buffer_test.go
@@ -0,0 +1,14 @@
+package roadrunner
+
+import (
+ "bytes"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestErrBuffer_Write_Len(t *testing.T) {
+ buf := &errBuffer{buffer: new(bytes.Buffer)}
+ buf.Write([]byte("hello"))
+ assert.Equal(t, 5, buf.Len())
+ assert.Equal(t, buf.String(), "hello")
+}
diff --git a/errors.go b/errors.go
new file mode 100644
index 00000000..db995721
--- /dev/null
+++ b/errors.go
@@ -0,0 +1,24 @@
+package roadrunner
+
+// JobError is job level error (no worker halt), wraps at top
+// of error context
+type JobError []byte
+
+// Error converts error context to string
+func (je JobError) Error() string {
+ return string(je)
+}
+
+// WorkerError is worker related error
+type WorkerError struct {
+ // Worker
+ Worker *Worker
+
+ // Caused error
+ Caused error
+}
+
+// Error converts error context to string
+func (e WorkerError) Error() string {
+ return e.Caused.Error()
+}
diff --git a/job_error_test.go b/errors_test.go
index 9b0fa53e..6bb650af 100644
--- a/job_error_test.go
+++ b/errors_test.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "errors"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -9,3 +10,8 @@ func Test_JobError_Error(t *testing.T) {
e := JobError([]byte("error"))
assert.Equal(t, "error", e.Error())
}
+
+func Test_WorkerError_Error(t *testing.T) {
+ e := WorkerError{Worker: nil, Caused: errors.New("error")}
+ assert.Equal(t, "error", e.Error())
+}
diff --git a/factory.go b/factory.go
index 97ea3a87..3c304824 100644
--- a/factory.go
+++ b/factory.go
@@ -7,4 +7,7 @@ type Factory interface {
// SpawnWorker creates new worker process based on given command.
// Process must not be started.
SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
+
+ // Close the factory and underlying connections.
+ Close() error
}
diff --git a/job_error.go b/job_error.go
deleted file mode 100644
index d024ad11..00000000
--- a/job_error.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package roadrunner
-
-// JobError is job level error (no worker halt), wraps at top
-// of error context
-type JobError []byte
-
-// Error converts error context to string
-func (je JobError) Error() string {
- return string(je)
-}
diff --git a/payload.go b/payload.go
index cc714eb3..154cec95 100644
--- a/payload.go
+++ b/payload.go
@@ -3,10 +3,10 @@ package roadrunner
// Payload carries binary header and body to workers and
// back to the server.
type Payload struct {
- // Context represent payload context, might be omitted
+ // Context represent payload context, might be omitted.
Context []byte
- // Body contains binary payload to be processed by worker
+ // body contains binary payload to be processed by worker.
Body []byte
}
diff --git a/source/Exceptions/RoadRunnerException.php b/php-src/Exceptions/RoadRunnerException.php
index fa7b8da3..fa7b8da3 100644
--- a/source/Exceptions/RoadRunnerException.php
+++ b/php-src/Exceptions/RoadRunnerException.php
diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php
new file mode 100644
index 00000000..f8913a8d
--- /dev/null
+++ b/php-src/PSR7Client.php
@@ -0,0 +1,131 @@
+<?php
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+
+namespace Spiral\RoadRunner;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Zend\Diactoros;
+
+/**
+ * Manages PSR-7 request and response.
+ */
+class PSR7Client
+{
+ /**
+ * @varWorker
+ */
+ private $worker;
+
+ /**
+ * @param Worker $worker
+ */
+ public function __construct(Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->worker;
+ }
+
+ /**
+ * @return ServerRequestInterface|null
+ */
+ public function acceptRequest()
+ {
+ $body = $this->worker->receive($ctx);
+ if (empty($body) && empty($ctx)) {
+ // termination request
+ return null;
+ }
+
+ if (empty($ctx = json_decode($ctx, true))) {
+ // invalid context
+ return null;
+ }
+
+ parse_str($ctx['rawQuery'], $query);
+
+ $bodyStream = 'php://input';
+ $parsedBody = null;
+ if ($ctx['parsed']) {
+ $parsedBody = json_decode($body, true);
+ } elseif ($body != null) {
+ $bodyStream = new Diactoros\Stream("php://memory", "rwb");
+ $bodyStream->write($body);
+ }
+
+ return new Diactoros\ServerRequest(
+ $_SERVER,
+ $this->wrapUploads($ctx['uploads']),
+ $ctx['uri'],
+ $ctx['method'],
+ $bodyStream,
+ $ctx['headers'],
+ $ctx['cookies'],
+ $query,
+ $parsedBody,
+ $ctx['protocol']
+ );
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param ResponseInterface $response
+ */
+ public function respond(ResponseInterface $response)
+ {
+ $headers = $response->getHeaders();
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->worker->send($response->getBody(), json_encode([
+ 'status' => $response->getStatusCode(),
+ 'headers' => $headers
+ ]));
+ }
+
+ /**
+ * Wraps all uploaded files with UploadedFile.
+ *
+ * @param array $files
+ *
+ * @return array
+ */
+ private function wrapUploads($files): array
+ {
+ if (empty($files)) {
+ return [];
+ }
+
+ $result = [];
+ foreach ($files as $index => $file) {
+ if (!isset($file['name'])) {
+ $result[$index] = $this->wrapUploads($file);
+ continue;
+ }
+
+ $result[$index] = new Diactoros\UploadedFile(
+ $file['tmpName'],
+ $file['size'],
+ $file['error'],
+ $file['name'],
+ $file['mime']
+ );
+ }
+
+ return $result;
+ }
+} \ No newline at end of file
diff --git a/source/Worker.php b/php-src/Worker.php
index 5835baf2..3e013090 100644
--- a/source/Worker.php
+++ b/php-src/Worker.php
@@ -56,6 +56,9 @@ class Worker
return $this->receive($header);
}
+ // no context for the termination.
+ $header = null;
+
// Expect process termination
return null;
}
diff --git a/tests/broken.php b/php-src/tests/broken.php
index b1a3839e..b1a3839e 100644
--- a/tests/broken.php
+++ b/php-src/tests/broken.php
diff --git a/tests/client.php b/php-src/tests/client.php
index 31caa410..fd5d60be 100644
--- a/tests/client.php
+++ b/php-src/tests/client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor/autoload.php";
+require dirname(__DIR__) . "/../vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/delay.php b/php-src/tests/delay.php
index bfde2fc4..bfde2fc4 100644
--- a/tests/delay.php
+++ b/php-src/tests/delay.php
diff --git a/tests/echo.php b/php-src/tests/echo.php
index ba58ff30..ba58ff30 100644
--- a/tests/echo.php
+++ b/php-src/tests/echo.php
diff --git a/tests/error.php b/php-src/tests/error.php
index ebd3418b..ebd3418b 100644
--- a/tests/error.php
+++ b/php-src/tests/error.php
diff --git a/tests/failboot.php b/php-src/tests/failboot.php
index fa8b96f6..fa8b96f6 100644
--- a/tests/failboot.php
+++ b/php-src/tests/failboot.php
diff --git a/tests/head.php b/php-src/tests/head.php
index 4f4e4061..4f4e4061 100644
--- a/tests/head.php
+++ b/php-src/tests/head.php
diff --git a/php-src/tests/http/client.php b/php-src/tests/http/client.php
new file mode 100644
index 00000000..3b6b5dc6
--- /dev/null
+++ b/php-src/tests/http/client.php
@@ -0,0 +1,45 @@
+<?php
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/../../vendor/autoload.php";
+
+if (count($argv) < 3) {
+ die("need 2 arguments");
+}
+
+list($test, $goridge) = [$argv[1], $argv[2]];
+
+switch ($goridge) {
+ case "pipes":
+ $relay = new Goridge\StreamRelay(STDIN, STDOUT);
+ break;
+
+ case "tcp":
+ $relay = new Goridge\SocketRelay("localhost", 9007);
+ break;
+
+ case "unix":
+ $relay = new Goridge\SocketRelay(
+ "sock.unix",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+ break;
+
+ default:
+ die("invalid protocol selection");
+}
+
+$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay));
+require_once sprintf("%s/%s.php", __DIR__, $test);
+
+while ($req = $psr7->acceptRequest()) {
+ try {
+ $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response()));
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/php-src/tests/http/cookie.php b/php-src/tests/http/cookie.php
new file mode 100644
index 00000000..196ceee2
--- /dev/null
+++ b/php-src/tests/http/cookie.php
@@ -0,0 +1,334 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $resp->getBody()->write(strtoupper($req->getCookieParams()['input']));
+
+ return $resp->withAddedHeader(
+ "Set-Cookie",
+ (new Cookie('output', 'cookie-output'))->createHeader()
+ );
+}
+
+final class Cookie
+{
+ /**
+ * The name of the cookie.
+ *
+ * @var string
+ */
+ private $name = '';
+ /**
+ * The value of the cookie. This value is stored on the clients computer; do not store sensitive
+ * information.
+ *
+ * @var string|null
+ */
+ private $value = null;
+ /**
+ * Cookie lifetime. This value specified in seconds and declares period of time in which cookie
+ * will expire relatively to current time() value.
+ *
+ * @var int|null
+ */
+ private $lifetime = null;
+ /**
+ * The path on the server in which the cookie will be available on.
+ *
+ * If set to '/', the cookie will be available within the entire domain. If set to '/foo/',
+ * the cookie will only be available within the /foo/ directory and all sub-directories such as
+ * /foo/bar/ of domain. The default value is the current directory that the cookie is being set
+ * in.
+ *
+ * @var string|null
+ */
+ private $path = null;
+ /**
+ * The domain that the cookie is available. To make the cookie available on all subdomains of
+ * example.com then you'd set it to '.example.com'. The . is not required but makes it
+ * compatible with more browsers. Setting it to www.example.com will make the cookie only
+ * available in the www subdomain. Refer to tail matching in the spec for details.
+ *
+ * @var string|null
+ */
+ private $domain = null;
+ /**
+ * Indicates that the cookie should only be transmitted over a secure HTTPS connection from the
+ * client. When set to true, the cookie will only be set if a secure connection exists.
+ * On the server-side, it's on the programmer to send this kind of cookie only on secure
+ * connection
+ * (e.g. with respect to $_SERVER["HTTPS"]).
+ *
+ * @var bool|null
+ */
+ private $secure = null;
+ /**
+ * When true the cookie will be made accessible only through the HTTP protocol. This means that
+ * the cookie won't be accessible by scripting languages, such as JavaScript. This setting can
+ * effectively help to reduce identity theft through XSS attacks (although it is not supported
+ * by all browsers).
+ *
+ * @var bool
+ */
+ private $httpOnly = true;
+
+ /**
+ * New Cookie instance, cookies used to schedule cookie set while dispatching Response.
+ *
+ * @link http://php.net/manual/en/function.setcookie.php
+ *
+ * @param string $name The name of the cookie.
+ * @param string $value The value of the cookie. This value is stored on the clients
+ * computer; do not store sensitive information.
+ * @param int $lifetime Cookie lifetime. This value specified in seconds and declares period
+ * of time in which cookie will expire relatively to current time()
+ * value.
+ * @param string $path The path on the server in which the cookie will be available on.
+ * If set to '/', the cookie will be available within the entire
+ * domain.
+ * If set to '/foo/', the cookie will only be available within the
+ * /foo/
+ * directory and all sub-directories such as /foo/bar/ of domain. The
+ * default value is the current directory that the cookie is being set
+ * in.
+ * @param string $domain The domain that the cookie is available. To make the cookie
+ * available
+ * on all subdomains of example.com then you'd set it to
+ * '.example.com'.
+ * The . is not required but makes it compatible with more browsers.
+ * Setting it to www.example.com will make the cookie only available in
+ * the www subdomain. Refer to tail matching in the spec for details.
+ * @param bool $secure Indicates that the cookie should only be transmitted over a secure
+ * HTTPS connection from the client. When set to true, the cookie will
+ * only be set if a secure connection exists. On the server-side, it's
+ * on the programmer to send this kind of cookie only on secure
+ * connection (e.g. with respect to $_SERVER["HTTPS"]).
+ * @param bool $httpOnly When true the cookie will be made accessible only through the HTTP
+ * protocol. This means that the cookie won't be accessible by
+ * scripting
+ * languages, such as JavaScript. This setting can effectively help to
+ * reduce identity theft through XSS attacks (although it is not
+ * supported by all browsers).
+ */
+ public function __construct(
+ string $name,
+ string $value = null,
+ int $lifetime = null,
+ string $path = null,
+ string $domain = null,
+ bool $secure = false,
+ bool $httpOnly = true
+ ) {
+ $this->name = $name;
+ $this->value = $value;
+ $this->lifetime = $lifetime;
+ $this->path = $path;
+ $this->domain = $domain;
+ $this->secure = $secure;
+ $this->httpOnly = $httpOnly;
+ }
+
+ /**
+ * The name of the cookie.
+ *
+ * @return string
+ */
+ public function getName(): string
+ {
+ return $this->name;
+ }
+
+ /**
+ * The value of the cookie. This value is stored on the clients computer; do not store sensitive
+ * information.
+ *
+ * @return string|null
+ */
+ public function getValue()
+ {
+ return $this->value;
+ }
+
+ /**
+ * The time the cookie expires. This is a Unix timestamp so is in number of seconds since the
+ * epoch. In other words, you'll most likely set this with the time function plus the number of
+ * seconds before you want it to expire. Or you might use mktime.
+ *
+ * Will return null if lifetime is not specified.
+ *
+ * @return int|null
+ */
+ public function getExpires()
+ {
+ if ($this->lifetime === null) {
+ return null;
+ }
+
+ return time() + $this->lifetime;
+ }
+
+ /**
+ * The path on the server in which the cookie will be available on.
+ *
+ * If set to '/', the cookie will be available within the entire domain. If set to '/foo/',
+ * the cookie will only be available within the /foo/ directory and all sub-directories such as
+ * /foo/bar/ of domain. The default value is the current directory that the cookie is being set
+ * in.
+ *
+ * @return string|null
+ */
+ public function getPath()
+ {
+ return $this->path;
+ }
+
+ /**
+ * The domain that the cookie is available. To make the cookie available on all subdomains of
+ * example.com then you'd set it to '.example.com'. The . is not required but makes it
+ * compatible with more browsers. Setting it to www.example.com will make the cookie only
+ * available in the www subdomain. Refer to tail matching in the spec for details.
+ *
+ * @return string|null
+ */
+ public function getDomain()
+ {
+ return $this->domain;
+ }
+
+ /**
+ * Indicates that the cookie should only be transmitted over a secure HTTPS connection from the
+ * client. When set to true, the cookie will only be set if a secure connection exists.
+ * On the server-side, it's on the programmer to send this kind of cookie only on secure
+ * connection
+ * (e.g. with respect to $_SERVER["HTTPS"]).
+ *
+ * @return bool
+ */
+ public function isSecure(): bool
+ {
+ return $this->secure;
+ }
+
+ /**
+ * When true the cookie will be made accessible only through the HTTP protocol. This means that
+ * the cookie won't be accessible by scripting languages, such as JavaScript. This setting can
+ * effectively help to reduce identity theft through XSS attacks (although it is not supported
+ * by all browsers).
+ *
+ * @return bool
+ */
+ public function isHttpOnly(): bool
+ {
+ return $this->httpOnly;
+ }
+
+ /**
+ * Get new cookie with altered value. Original cookie object should not be changed.
+ *
+ * @param string $value
+ *
+ * @return Cookie
+ */
+ public function withValue(string $value): self
+ {
+ $cookie = clone $this;
+ $cookie->value = $value;
+
+ return $cookie;
+ }
+
+ /**
+ * Convert cookie instance to string.
+ *
+ * @link http://www.w3.org/Protocols/rfc2109/rfc2109
+ * @return string
+ */
+ public function createHeader(): string
+ {
+ $header = [
+ rawurlencode($this->name) . '=' . rawurlencode($this->value)
+ ];
+ if ($this->lifetime !== null) {
+ $header[] = 'Expires=' . gmdate(\DateTime::COOKIE, $this->getExpires());
+ $header[] = 'Max-Age=' . $this->lifetime;
+ }
+ if (!empty($this->path)) {
+ $header[] = 'Path=' . $this->path;
+ }
+ if (!empty($this->domain)) {
+ $header[] = 'Domain=' . $this->domain;
+ }
+ if ($this->secure) {
+ $header[] = 'Secure';
+ }
+ if ($this->httpOnly) {
+ $header[] = 'HttpOnly';
+ }
+
+ return join('; ', $header);
+ }
+
+ /**
+ * New Cookie instance, cookies used to schedule cookie set while dispatching Response.
+ * Static constructor.
+ *
+ * @link http://php.net/manual/en/function.setcookie.php
+ *
+ * @param string $name The name of the cookie.
+ * @param string $value The value of the cookie. This value is stored on the clients
+ * computer; do not store sensitive information.
+ * @param int $lifetime Cookie lifetime. This value specified in seconds and declares period
+ * of time in which cookie will expire relatively to current time()
+ * value.
+ * @param string $path The path on the server in which the cookie will be available on.
+ * If set to '/', the cookie will be available within the entire
+ * domain.
+ * If set to '/foo/', the cookie will only be available within the
+ * /foo/
+ * directory and all sub-directories such as /foo/bar/ of domain. The
+ * default value is the current directory that the cookie is being set
+ * in.
+ * @param string $domain The domain that the cookie is available. To make the cookie
+ * available
+ * on all subdomains of example.com then you'd set it to
+ * '.example.com'.
+ * The . is not required but makes it compatible with more browsers.
+ * Setting it to www.example.com will make the cookie only available in
+ * the www subdomain. Refer to tail matching in the spec for details.
+ * @param bool $secure Indicates that the cookie should only be transmitted over a secure
+ * HTTPS connection from the client. When set to true, the cookie will
+ * only be set if a secure connection exists. On the server-side, it's
+ * on the programmer to send this kind of cookie only on secure
+ * connection (e.g. with respect to $_SERVER["HTTPS"]).
+ * @param bool $httpOnly When true the cookie will be made accessible only through the HTTP
+ * protocol. This means that the cookie won't be accessible by
+ * scripting
+ * languages, such as JavaScript. This setting can effectively help to
+ * reduce identity theft through XSS attacks (although it is not
+ * supported by all browsers).
+ *
+ * @return Cookie
+ */
+ public static function create(
+ string $name,
+ string $value = null,
+ int $lifetime = null,
+ string $path = null,
+ string $domain = null,
+ bool $secure = false,
+ bool $httpOnly = true
+ ): self {
+ return new self($name, $value, $lifetime, $path, $domain, $secure, $httpOnly);
+ }
+
+ /**
+ * @return string
+ */
+ public function __toString(): string
+ {
+ return $this->createHeader();
+ }
+} \ No newline at end of file
diff --git a/php-src/tests/http/data.php b/php-src/tests/http/data.php
new file mode 100644
index 00000000..c5e0bab0
--- /dev/null
+++ b/php-src/tests/http/data.php
@@ -0,0 +1,18 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+
+ $data = $req->getParsedBody();
+
+ ksort($data);
+ ksort($data['arr']);
+ ksort($data['arr']['x']['y']);
+
+ $resp->getBody()->write(json_encode($data));
+
+ return $resp;
+} \ No newline at end of file
diff --git a/php-src/tests/http/echo.php b/php-src/tests/http/echo.php
new file mode 100644
index 00000000..7004ada0
--- /dev/null
+++ b/php-src/tests/http/echo.php
@@ -0,0 +1,10 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $resp->getBody()->write(strtoupper($req->getQueryParams()['hello']));
+ return $resp->withStatus(201);
+} \ No newline at end of file
diff --git a/php-src/tests/http/error.php b/php-src/tests/http/error.php
new file mode 100644
index 00000000..6df0d4b5
--- /dev/null
+++ b/php-src/tests/http/error.php
@@ -0,0 +1,9 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ throw new Error("error");
+} \ No newline at end of file
diff --git a/php-src/tests/http/error2.php b/php-src/tests/http/error2.php
new file mode 100644
index 00000000..617b5a3f
--- /dev/null
+++ b/php-src/tests/http/error2.php
@@ -0,0 +1,9 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ exit();
+} \ No newline at end of file
diff --git a/php-src/tests/http/header.php b/php-src/tests/http/header.php
new file mode 100644
index 00000000..e5b295b6
--- /dev/null
+++ b/php-src/tests/http/header.php
@@ -0,0 +1,11 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $resp->getBody()->write(strtoupper($req->getHeaderLine('input')));
+
+ return $resp->withAddedHeader("Header", $req->getQueryParams()['hello']);
+} \ No newline at end of file
diff --git a/php-src/tests/http/payload.php b/php-src/tests/http/payload.php
new file mode 100644
index 00000000..a16984c5
--- /dev/null
+++ b/php-src/tests/http/payload.php
@@ -0,0 +1,13 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ // we expect json body
+ $p = json_decode($req->getBody(), true);
+ $resp->getBody()->write(json_encode(array_flip($p)));
+
+ return $resp;
+} \ No newline at end of file
diff --git a/php-src/tests/http/pid.php b/php-src/tests/http/pid.php
new file mode 100644
index 00000000..1cc322bf
--- /dev/null
+++ b/php-src/tests/http/pid.php
@@ -0,0 +1,11 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $resp->getBody()->write(getmypid());
+
+ return $resp;
+} \ No newline at end of file
diff --git a/php-src/tests/http/upload.php b/php-src/tests/http/upload.php
new file mode 100644
index 00000000..2f7c0b64
--- /dev/null
+++ b/php-src/tests/http/upload.php
@@ -0,0 +1,35 @@
+<?php
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $files = $req->getUploadedFiles();
+ array_walk_recursive($files, function (&$v) {
+ /**
+ * @var \Psr\Http\Message\UploadedFileInterface $v
+ */
+
+ if ($v->getError()) {
+ $v = [
+ 'name' => $v->getClientFilename(),
+ 'size' => $v->getSize(),
+ 'mime' => $v->getClientMediaType(),
+ 'error' => $v->getError(),
+ ];
+ } else {
+ $v = [
+ 'name' => $v->getClientFilename(),
+ 'size' => $v->getSize(),
+ 'mime' => $v->getClientMediaType(),
+ 'error' => $v->getError(),
+ 'md5' => md5($v->getStream()->__toString()),
+ ];
+ }
+ });
+
+ $resp->getBody()->write(json_encode($files, JSON_UNESCAPED_SLASHES));
+
+ return $resp;
+} \ No newline at end of file
diff --git a/tests/pid.php b/php-src/tests/pid.php
index a8cfa229..a8cfa229 100644
--- a/tests/pid.php
+++ b/php-src/tests/pid.php
diff --git a/php-src/tests/sample.txt b/php-src/tests/sample.txt
new file mode 100644
index 00000000..eed7e79a
--- /dev/null
+++ b/php-src/tests/sample.txt
@@ -0,0 +1 @@
+sample \ No newline at end of file
diff --git a/tests/slow-client.php b/php-src/tests/slow-client.php
index f09142b5..2722868c 100644
--- a/tests/slow-client.php
+++ b/php-src/tests/slow-client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor/autoload.php";
+require dirname(__DIR__) . "/../vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/stop.php b/php-src/tests/stop.php
index caa485d6..caa485d6 100644
--- a/tests/stop.php
+++ b/php-src/tests/stop.php
diff --git a/pipe_factory.go b/pipe_factory.go
index 3b2f2f88..d6fe0420 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -46,8 +46,13 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
if pid, err := fetchPID(w.rl); pid != *w.Pid {
go func(w *Worker) { w.Kill() }(w)
+
if wErr := w.Wait(); wErr != nil {
- err = errors.Wrap(wErr, err.Error())
+ if _, ok := wErr.(*exec.ExitError); ok {
+ err = errors.Wrap(wErr, err.Error())
+ } else {
+ err = wErr
+ }
}
return nil, errors.Wrap(err, "unable to connect to worker")
@@ -56,3 +61,8 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
w.state.set(StateReady)
return w, nil
}
+
+// Close the factory.
+func (f *PipeFactory) Close() error {
+ return nil
+}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 9d50e47f..ae276ab6 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -7,7 +7,7 @@ import (
)
func Test_Pipe_Start(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.NoError(t, err)
@@ -21,7 +21,7 @@ func Test_Pipe_Start(t *testing.T) {
}
func Test_Pipe_StartError(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
cmd.Start()
w, err := NewPipeFactory().SpawnWorker(cmd)
@@ -30,7 +30,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
func Test_Pipe_PipeError(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
cmd.StdinPipe()
w, err := NewPipeFactory().SpawnWorker(cmd)
@@ -39,7 +39,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
func Test_Pipe_PipeError2(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
cmd.StdoutPipe()
w, err := NewPipeFactory().SpawnWorker(cmd)
@@ -48,7 +48,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
func Test_Pipe_Failboot(t *testing.T) {
- cmd := exec.Command("php", "tests/failboot.php")
+ cmd := exec.Command("php", "php-src/tests/failboot.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
@@ -57,7 +57,7 @@ func Test_Pipe_Failboot(t *testing.T) {
}
func Test_Pipe_Invalid(t *testing.T) {
- cmd := exec.Command("php", "tests/invalid.php")
+ cmd := exec.Command("php", "php-src/tests/invalid.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
@@ -65,7 +65,7 @@ func Test_Pipe_Invalid(t *testing.T) {
}
func Test_Pipe_Echo(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -84,7 +84,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
func Test_Pipe_Broken(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -104,7 +104,7 @@ func Test_Pipe_Broken(t *testing.T) {
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorker(cmd)
go func() {
if w.Wait() != nil {
@@ -117,7 +117,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
diff --git a/pool.go b/pool.go
index f8b5f471..30c30dfa 100644
--- a/pool.go
+++ b/pool.go
@@ -1,20 +1,29 @@
package roadrunner
const (
- // EventCreated thrown when new worker is spawned.
- EventCreated = iota
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct = iota + 100
- // EventDestruct thrown before worker destruction.
- EventDestruct
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
- // EventError thrown any worker related even happen (error passed as context)
- EventError
+ // EventWorkerKill thrown after worker is being forcefully killed.
+ EventWorkerKill
+
+ // EventWorkerError thrown any worker related even happen (passed with WorkerError)
+ EventWorkerError
+
+ // EventWorkerDead thrown when worker stops worker for any reason.
+ EventWorkerDead
+
+ // EventPoolError caused on pool wide errors
+ EventPoolError
)
// Pool managed set of inner worker processes.
type Pool interface {
- // Report all caused events to attached watcher.
- Report(o func(event int, w *Worker, ctx interface{}))
+ // AddListener all caused events to attached watcher.
+ Listen(l func(event int, ctx interface{}))
// Exec one task with given payload and context, returns result or error.
Exec(rqs *Payload) (rsp *Payload, err error)
diff --git a/server.go b/server.go
new file mode 100644
index 00000000..4ffdf367
--- /dev/null
+++ b/server.go
@@ -0,0 +1,207 @@
+package roadrunner
+
+import (
+ "fmt"
+ "sync"
+ "github.com/pkg/errors"
+)
+
+const (
+ // EventPoolConstruct triggered when server creates new pool.
+ EventServerStart = iota + 200
+
+ // EventPoolConstruct triggered when server creates new pool.
+ EventServerStop
+
+ // EventServerFailure triggered when server is unable to replace dead pool.
+ EventServerFailure
+
+ // EventPoolConstruct triggered when server creates new pool.
+ EventPoolConstruct
+
+ // EventPoolDestruct triggered when server destroys existed pool.
+ EventPoolDestruct
+)
+
+// Service manages pool creation and swapping.
+type Server struct {
+ // configures server, pool, cmd creation and factory.
+ cfg *ServerConfig
+
+ // protects pool while the re-configuration
+ mu sync.Mutex
+
+ // indicates that server was started
+ started bool
+
+ // creates and connects to workers
+ factory Factory
+
+ // currently active pool instance
+ pool Pool
+
+ // observes pool events (can be attached to multiple pools at the same time)
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
+}
+
+// NewServer creates new router. Make sure to call configure before the usage.
+func NewServer(cfg *ServerConfig) *Server {
+ return &Server{cfg: cfg}
+}
+
+// AddListener attaches server event watcher.
+func (s *Server) Listen(l func(event int, ctx interface{})) {
+ s.mul.Lock()
+ defer s.mul.Unlock()
+
+ s.lsn = l
+}
+
+// Start underlying worker pool, configure factory and command provider.
+func (s *Server) Start() (err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.factory, err = s.cfg.makeFactory(); err != nil {
+ return err
+ }
+
+ if s.pool, err = NewPool(s.cfg.makeCommand(), s.factory, *s.cfg.Pool); err != nil {
+ return err
+ }
+
+ s.pool.Listen(s.poolListener)
+ s.started = true
+ s.throw(EventServerStart, s)
+
+ return nil
+}
+
+// Stop underlying worker pool and close the factory.
+func (s *Server) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if !s.started {
+ return
+ }
+
+ s.throw(EventPoolDestruct, s.pool)
+ s.pool.Destroy()
+ s.factory.Close()
+
+ s.factory = nil
+ s.pool = nil
+ s.started = false
+ s.throw(EventServerStop, s)
+}
+
+// Exec one task with given payload and context, returns result or error.
+func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
+ pool := s.Pool()
+ if pool == nil {
+ return nil, fmt.Errorf("no associared pool")
+ }
+
+ return pool.Exec(rqs)
+}
+
+// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory
+// and relay settings.
+func (s *Server) Reconfigure(cfg *ServerConfig) error {
+ s.mu.Lock()
+ if !s.started {
+ s.cfg = cfg
+ s.mu.Unlock()
+ return nil
+ }
+ s.mu.Unlock()
+
+ if s.cfg.Differs(cfg) {
+ return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
+ }
+
+ s.mu.Lock()
+ previous := s.pool
+ s.mu.Unlock()
+
+ pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
+ if err != nil {
+ return err
+ }
+
+ s.pool.Listen(s.poolListener)
+
+ s.mu.Lock()
+ s.cfg.Pool, s.pool = cfg.Pool, pool
+ s.mu.Unlock()
+
+ s.throw(EventPoolConstruct, pool)
+
+ if previous != nil {
+ go func(previous Pool) {
+ s.throw(EventPoolDestruct, previous)
+ previous.Destroy()
+ }(previous)
+ }
+
+ return nil
+}
+
+// Reset resets the state of underlying pool and rebuilds all of it's workers.
+func (s *Server) Reset() error {
+ s.mu.Lock()
+ cfg := s.cfg
+ s.mu.Unlock()
+
+ return s.Reconfigure(cfg)
+}
+
+// Workers returns worker list associated with the server pool.
+func (s *Server) Workers() (workers []*Worker) {
+ p := s.Pool()
+ if p == nil {
+ return nil
+ }
+
+ return p.Workers()
+}
+
+// Pool returns active pool or error.
+func (s *Server) Pool() Pool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ return s.pool
+}
+
+// AddListener pool events.
+func (s *Server) poolListener(event int, ctx interface{}) {
+ if event == EventPoolError {
+ // pool failure, rebuilding
+ if err := s.Reset(); err != nil {
+ s.mu.Lock()
+ s.started = false
+ s.pool = nil
+ s.factory = nil
+ s.mu.Unlock()
+
+ // everything is dead, this is recoverable but heavy state
+ s.throw(EventServerFailure, err)
+ }
+ }
+
+ // bypassing to user specified lsn
+ s.throw(event, ctx)
+}
+
+// throw invokes event handler if any.
+func (s *Server) throw(event int, ctx interface{}) {
+ s.mul.Lock()
+ defer s.mul.Unlock()
+
+ if s.lsn != nil {
+ s.lsn(event, ctx)
+ }
+}
diff --git a/server_config.go b/server_config.go
new file mode 100644
index 00000000..8af0e0f8
--- /dev/null
+++ b/server_config.go
@@ -0,0 +1,60 @@
+package roadrunner
+
+import (
+ "errors"
+ "net"
+ "strings"
+ "time"
+ "os/exec"
+)
+
+// Server config combines factory, pool and cmd configurations.
+type ServerConfig struct {
+ // Command includes command strings with all the parameters, example: "php worker.php pipes".
+ Command string
+
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration.
+ RelayTimeout time.Duration
+
+ // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
+ // while server is running.
+ Pool *Config
+}
+
+// Differs returns true if configuration has changed but ignores pool or cmd changes.
+func (cfg *ServerConfig) Differs(new *ServerConfig) bool {
+ return cfg.Relay != new.Relay || cfg.RelayTimeout != new.RelayTimeout
+}
+
+// makeCommands returns new command provider based on configured options.
+func (cfg *ServerConfig) makeCommand() func() *exec.Cmd {
+ var cmd = strings.Split(cfg.Command, " ")
+ return func() *exec.Cmd {
+ return exec.Command(cmd[0], cmd[1:]...)
+ }
+}
+
+// makeFactory creates and connects new factory instance based on given parameters.
+func (cfg *ServerConfig) makeFactory() (Factory, error) {
+ if cfg.Relay == "pipes" || cfg.Relay == "pipe" {
+ return NewPipeFactory(), nil
+ }
+
+ dsn := strings.Split(cfg.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)")
+ }
+
+ ln, err := net.Listen(dsn[0], dsn[1])
+ if err != nil {
+ return nil, err
+ }
+
+ return NewSocketFactory(ln, cfg.RelayTimeout), nil
+}
diff --git a/server_config_test.go b/server_config_test.go
new file mode 100644
index 00000000..1831ae95
--- /dev/null
+++ b/server_config_test.go
@@ -0,0 +1,92 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+)
+
+func Test_ServerConfig_PipeFactory(t *testing.T) {
+ cfg := &ServerConfig{Relay: "pipes"}
+ f, err := cfg.makeFactory()
+
+ assert.NoError(t, err)
+ assert.IsType(t, &PipeFactory{}, f)
+
+ cfg = &ServerConfig{Relay: "pipe"}
+ f, err = cfg.makeFactory()
+ assert.NoError(t, err)
+ assert.NotNil(t, f)
+ defer f.Close()
+
+ assert.NoError(t, err)
+ assert.IsType(t, &PipeFactory{}, f)
+}
+
+func Test_ServerConfig_SocketFactory(t *testing.T) {
+ cfg := &ServerConfig{Relay: "tcp://:9111"}
+ f, err := cfg.makeFactory()
+ assert.NoError(t, err)
+ assert.NotNil(t, f)
+ defer f.Close()
+
+ assert.NoError(t, err)
+ assert.IsType(t, &SocketFactory{}, f)
+ assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network())
+ assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String())
+
+ cfg = &ServerConfig{Relay: "tcp://localhost:9112"}
+ f, err = cfg.makeFactory()
+ assert.NoError(t, err)
+ assert.NotNil(t, f)
+ defer f.Close()
+
+ assert.NoError(t, err)
+ assert.IsType(t, &SocketFactory{}, f)
+ assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network())
+ assert.Equal(t, "127.0.0.1:9112", f.(*SocketFactory).ls.Addr().String())
+}
+
+func Test_ServerConfig_UnixSocketFactory(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &ServerConfig{Relay: "unix://unix.sock"}
+ f, err := cfg.makeFactory()
+ defer f.Close()
+
+ assert.NoError(t, err)
+ assert.IsType(t, &SocketFactory{}, f)
+ assert.Equal(t, "unix", f.(*SocketFactory).ls.Addr().Network())
+ assert.Equal(t, "unix.sock", f.(*SocketFactory).ls.Addr().String())
+}
+
+func Test_ServerConfig_ErrorFactory(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &ServerConfig{Relay: "uni:unix.sock"}
+ f, err := cfg.makeFactory()
+ assert.Nil(t, f)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)", err.Error())
+}
+
+func Test_ServerConfig_ErrorMethod(t *testing.T) {
+ cfg := &ServerConfig{Relay: "xinu://unix.sock"}
+
+ f, err := cfg.makeFactory()
+ assert.Nil(t, f)
+ assert.Error(t, err)
+}
+
+func Test_ServerConfig_Cmd(t *testing.T) {
+ cfg := &ServerConfig{
+ Command: "php php-src/tests/client.php pipes",
+ }
+
+ cmd := cfg.makeCommand()
+ assert.NotNil(t, cmd)
+}
diff --git a/server_test.go b/server_test.go
new file mode 100644
index 00000000..13cacc2c
--- /dev/null
+++ b/server_test.go
@@ -0,0 +1,229 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+ "time"
+ "os/exec"
+)
+
+func TestServer_PipesEcho(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+
+ res, err := srv.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func TestServer_SocketEcho(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo tcp",
+ Relay: "tcp://:9007",
+ RelayTimeout: 10 * time.Second,
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+
+ res, err := srv.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func TestServer_Configure_BeforeStart(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ err := srv.Reconfigure(&ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 2,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ assert.NoError(t, err)
+
+ assert.NoError(t, srv.Start())
+
+ res, err := srv.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Len(t, srv.Workers(), 2)
+}
+
+func TestServer_Stop_NotStarted(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+
+ srv.Stop()
+ assert.Nil(t, srv.Workers())
+}
+
+func TestServer_Reconfigure(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+ assert.Len(t, srv.Workers(), 1)
+
+ err := srv.Reconfigure(&ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 2,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ assert.NoError(t, err)
+
+ assert.Len(t, srv.Workers(), 2)
+}
+
+func TestServer_Reset(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+ assert.Len(t, srv.Workers(), 1)
+
+ pid := *srv.Workers()[0].Pid
+ assert.NoError(t, srv.Reset())
+ assert.Len(t, srv.Workers(), 1)
+ assert.NotEqual(t, pid, srv.Workers()[0].Pid)
+}
+
+func TestServer_ReplacePool(t *testing.T) {
+ srv := NewServer(
+ &ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+
+ constructed := make(chan interface{})
+ srv.Listen(func(e int, ctx interface{}) {
+ if e == EventPoolConstruct {
+ close(constructed)
+ }
+ })
+
+ srv.Reset()
+ <-constructed
+
+ for _, w := range srv.Workers() {
+ assert.Equal(t, StateReady, w.state.Value())
+ }
+}
+
+func TestServer_ServerFailure(t *testing.T) {
+ srv := NewServer(&ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer srv.Stop()
+
+ assert.NoError(t, srv.Start())
+
+ failure := make(chan interface{})
+ srv.Listen(func(e int, ctx interface{}) {
+ if e == EventServerFailure {
+ failure <- nil
+ }
+ })
+
+ // emulating potential server failure
+ srv.cfg.Command = "php php-src/tests/client.php echo broken-connection"
+ srv.pool.(*StaticPool).cmd = func() *exec.Cmd {
+ return exec.Command("php", "php-src/tests/client.php", "echo", "broken-connection")
+ }
+
+ // killing random worker and expecting pool to replace it
+ srv.Workers()[0].cmd.Process.Kill()
+
+ <-failure
+ assert.True(t, true)
+}
diff --git a/service/container.go b/service/container.go
new file mode 100644
index 00000000..ce9146bc
--- /dev/null
+++ b/service/container.go
@@ -0,0 +1,171 @@
+package service
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+// Container controls all internal RR services and provides plugin based system.
+type Container interface {
+ // Register add new service to the container under given name.
+ Register(name string, service Service)
+
+ // Reconfigure configures all underlying services with given configuration.
+ Init(cfg Config) error
+
+ // Check if svc has been registered.
+ Has(service string) bool
+
+ // Get returns svc instance by it's name or nil if svc not found. Method returns current service status
+ // as second value.
+ Get(service string) (svc Service, status int)
+
+ // Serve all configured services. Non blocking.
+ Serve() error
+
+ // Close all active services.
+ Stop()
+}
+
+type container struct {
+ log logrus.FieldLogger
+ mu sync.Mutex
+ services []*entry
+}
+
+// NewContainer creates new service container.
+func NewContainer(log logrus.FieldLogger) Container {
+ return &container{
+ log: log,
+ services: make([]*entry, 0),
+ }
+}
+
+// Register add new service to the container under given name.
+func (c *container) Register(name string, service Service) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.services = append(c.services, &entry{
+ name: name,
+ svc: service,
+ status: StatusRegistered,
+ })
+
+ c.log.Debugf("[%s]: registered", name)
+}
+
+// Check hasStatus svc has been registered.
+func (c *container) Has(target string) bool {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, e := range c.services {
+ if e.name == target {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Get returns svc instance by it's name or nil if svc not found.
+func (c *container) Get(target string) (svc Service, status int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, e := range c.services {
+ if e.name == target {
+ return e.svc, e.getStatus()
+ }
+ }
+
+ return nil, StatusUndefined
+}
+
+// Init configures all underlying services with given configuration.
+func (c *container) Init(cfg Config) error {
+ for _, e := range c.services {
+ if e.getStatus() >= StatusConfigured {
+ return fmt.Errorf("service [%s] has already been configured", e.name)
+ }
+
+ segment := cfg.Get(e.name)
+ if segment == nil {
+ c.log.Debugf("[%s]: no config has been provided", e.name)
+ continue
+ }
+
+ ok, err := e.svc.Init(segment, c)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
+ } else if ok {
+ e.setStatus(StatusConfigured)
+ }
+ }
+
+ return nil
+}
+
+// Serve all configured services. Non blocking.
+func (c *container) Serve() error {
+ var (
+ numServing int
+ done = make(chan interface{}, len(c.services))
+ )
+ defer close(done)
+
+ for _, e := range c.services {
+ if e.hasStatus(StatusConfigured) {
+ numServing ++
+ } else {
+ continue
+ }
+
+ c.log.Debugf("[%s]: started", e.name)
+ go func(e *entry) {
+ e.setStatus(StatusServing)
+ defer e.setStatus(StatusStopped)
+
+ if err := e.svc.Serve(); err != nil {
+ c.log.Errorf("[%s]: %s", e.name, err)
+ done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
+ }
+ }(e)
+ }
+
+ for i := 0; i < numServing; i++ {
+ result := <-done
+
+ // found an error in one of the services, stopping the rest of running services.
+ if err, ok := result.(error); ok {
+ c.Stop()
+ return err
+ }
+ }
+
+ return nil
+}
+
+// Stop sends stop command to all running services.
+func (c *container) Stop() {
+ for _, e := range c.services {
+ if e.hasStatus(StatusServing) {
+ e.svc.Stop()
+ e.setStatus(StatusStopped)
+ c.log.Debugf("[%s]: stopped", e.name)
+ }
+ }
+}
diff --git a/service/container_test.go b/service/container_test.go
new file mode 100644
index 00000000..3c2f8761
--- /dev/null
+++ b/service/container_test.go
@@ -0,0 +1,327 @@
+package service
+
+import (
+ "testing"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/sirupsen/logrus"
+ "encoding/json"
+ "errors"
+ "time"
+ "sync"
+)
+
+type testService struct {
+ mu sync.Mutex
+ waitForServe chan interface{}
+ delay time.Duration
+ ok bool
+ cfg Config
+ c Container
+ cfgE, serveE error
+ done chan interface{}
+}
+
+func (t *testService) Init(cfg Config, c Container) (enabled bool, err error) {
+ t.cfg = cfg
+ t.c = c
+ t.done = make(chan interface{})
+ return t.ok, t.cfgE
+}
+
+func (t *testService) Serve() error {
+ time.Sleep(t.delay)
+
+ if t.serveE != nil {
+ return t.serveE
+ }
+
+ if c := t.waitChan(); c != nil {
+ close(c)
+ t.setChan(nil)
+ }
+
+ <-t.done
+ return nil
+}
+
+func (t *testService) Stop() {
+ close(t.done)
+}
+
+func (t *testService) waitChan() chan interface{} {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ return t.waitForServe
+}
+
+func (t *testService) setChan(c chan interface{}) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ t.waitForServe = c
+}
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) Config {
+ vars := make(map[string]string)
+ json.Unmarshal([]byte(cfg.cfg), &vars)
+
+ v, ok := vars[name]
+ if !ok {
+ return nil
+ }
+
+ return &testCfg{cfg: v}
+}
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func TestContainer_Register(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+}
+
+func TestContainer_Has(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.True(t, c.Has("test"))
+ assert.False(t, c.Has("another"))
+}
+
+func TestContainer_Get(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+
+ s, st = c.Get("another")
+ assert.Nil(t, s)
+ assert.Equal(t, StatusUndefined, st)
+}
+
+func TestContainer_Stop_NotStarted(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+ assert.Equal(t, 1, len(hook.Entries))
+
+ c.Stop()
+}
+
+func TestContainer_Configure(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusConfigured, st)
+}
+
+func TestContainer_ConfigureNull(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Init(&testCfg{`{"another":"something"}`}))
+ assert.Equal(t, 2, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureDisabled(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: false}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
+ assert.Equal(t, 1, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureError(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: false,
+ cfgE: errors.New("configure error"),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ err := c.Init(&testCfg{`{"test":"something"}`})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "configure error")
+ assert.Contains(t, err.Error(), "test")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureTwice(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
+ assert.Error(t, c.Init(&testCfg{`{"test":"something"}`}))
+}
+
+func TestContainer_ServeEmptyContainer(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Serve())
+ c.Stop()
+}
+
+func TestContainer_Serve(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
+
+ go func() {
+ assert.NoError(t, c.Serve())
+ }()
+
+ <-svc.waitChan()
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusServing, st)
+
+ c.Stop()
+
+ s, st = c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}
+
+func TestContainer_ServeError(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ serveE: errors.New("serve error"),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
+
+ err := c.Serve()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "serve error")
+ assert.Contains(t, err.Error(), "test")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}
+
+func TestContainer_ServeErrorMultiple(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ delay: time.Millisecond * 10,
+ waitForServe: make(chan interface{}),
+ serveE: errors.New("serve error"),
+ }
+
+ svc2 := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test2", svc2)
+ c.Register("test", svc)
+ assert.Equal(t, 2, len(hook.Entries))
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`}))
+
+ err := c.Serve()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "serve error")
+ assert.Contains(t, err.Error(), "test")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+
+ s, st = c.Get("test2")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}
diff --git a/service/http/config.go b/service/http/config.go
new file mode 100644
index 00000000..2bc5f845
--- /dev/null
+++ b/service/http/config.go
@@ -0,0 +1,42 @@
+package http
+
+import (
+ "github.com/spiral/roadrunner"
+ "errors"
+ "strings"
+)
+
+// Configures RoadRunner HTTP server.
+type Config struct {
+ // Enable enables http svc.
+ Enable bool
+
+ // Address and port to handle as http server.
+ Address string
+
+ // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited.
+ MaxRequest int64
+
+ // Uploads configures uploads configuration.
+ Uploads *UploadsConfig
+
+ // Workers configures roadrunner server and worker pool.
+ Workers *roadrunner.ServerConfig
+}
+
+// Valid validates the configuration.
+func (cfg *Config) Valid() error {
+ if cfg.Uploads == nil {
+ return errors.New("mailformed uploads config")
+ }
+
+ if cfg.Workers == nil {
+ return errors.New("mailformed workers config")
+ }
+
+ if !strings.Contains(cfg.Address, ":") {
+ return errors.New("mailformed server address")
+ }
+
+ return nil
+}
diff --git a/service/http/config_test.go b/service/http/config_test.go
new file mode 100644
index 00000000..b806b79b
--- /dev/null
+++ b/service/http/config_test.go
@@ -0,0 +1,88 @@
+package http
+
+import (
+ "testing"
+ "os"
+ "github.com/stretchr/testify/assert"
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+func Test_Config_Valid(t *testing.T) {
+ cfg := &Config{
+ Enable: true,
+ Address: ":8080",
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.NoError(t, cfg.Valid())
+}
+
+func Test_Config_NoUploads(t *testing.T) {
+ cfg := &Config{
+ Enable: true,
+ Address: ":8080",
+ MaxRequest: 1024,
+ Workers: &roadrunner.ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_NoWorkers(t *testing.T) {
+ cfg := &Config{
+ Enable: true,
+ Address: ":8080",
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_InvalidAddress(t *testing.T) {
+ cfg := &Config{
+ Enable: true,
+ Address: "",
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php php-src/tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
diff --git a/service/http/handler.go b/service/http/handler.go
new file mode 100644
index 00000000..a4cb6406
--- /dev/null
+++ b/service/http/handler.go
@@ -0,0 +1,121 @@
+package http
+
+import (
+ "net/http"
+ "strconv"
+ "github.com/spiral/roadrunner"
+ "github.com/pkg/errors"
+ "sync"
+)
+
+const (
+ // EventResponse thrown after the request been processed. See Event as payload.
+ EventResponse = iota + 500
+
+ // EventError thrown on any non job error provided by road runner server.
+ EventError
+)
+
+// Event represents singular http response event.
+type Event struct {
+ // Method of the request.
+ Method string
+
+ // Uri requested by the client.
+ Uri string
+
+ // Status is response status.
+ Status int
+
+ // Associated error, if any.
+ Error error
+}
+
+// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Handler struct {
+ cfg *Config
+ rr *roadrunner.Server
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
+}
+
+// AddListener attaches pool event watcher.
+func (h *Handler) Listen(l func(event int, ctx interface{})) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ h.lsn = l
+}
+
+// middleware serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ // validating request size
+ if h.cfg.MaxRequest != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ h.handleError(w, r, err)
+ return
+ } else if size > h.cfg.MaxRequest*1024*1024 {
+ h.handleError(w, r, errors.New("request body max size is exceeded"))
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, h.cfg.Uploads)
+ if err != nil {
+ h.handleError(w, r, err)
+ return
+ }
+
+ if err = req.Open(); err != nil {
+ h.handleError(w, r, err)
+ return
+ }
+ defer req.Close()
+
+ p, err := req.Payload()
+ if err != nil {
+ h.handleError(w, r, err)
+ return
+ }
+
+ rsp, err := h.rr.Exec(p)
+ if err != nil {
+ h.handleError(w, r, err)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ h.handleError(w, r, err)
+ return
+ }
+
+ h.handleResponse(req, resp)
+ resp.Write(w)
+}
+
+// handleError sends error.
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) {
+ h.throw(EventError, &Event{Method: r.Method, Uri: uri(r), Status: 500, Error: err})
+
+ w.WriteHeader(500)
+ w.Write([]byte(err.Error()))
+}
+
+// handleResponse triggers response event.
+func (h *Handler) handleResponse(req *Request, resp *Response) {
+ h.throw(EventResponse, &Event{Method: req.Method, Uri: req.Uri, Status: resp.Status})
+}
+
+// throw invokes event srv if any.
+func (h *Handler) throw(event int, ctx interface{}) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ if h.lsn != nil {
+ h.lsn(event, ctx)
+ }
+}
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
new file mode 100644
index 00000000..d599b1d8
--- /dev/null
+++ b/service/http/handler_test.go
@@ -0,0 +1,821 @@
+package http
+
+import (
+ "net/http"
+ "io/ioutil"
+ "github.com/spiral/roadrunner"
+ "testing"
+ "os"
+ "github.com/stretchr/testify/assert"
+ "net/url"
+ "strings"
+ "context"
+ "bytes"
+ "mime/multipart"
+ "time"
+ "runtime"
+)
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ return string(b), r, err
+}
+
+func TestServer_Echo(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := get("http://localhost:8077/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestServer_Headers(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php header pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8078", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil)
+ assert.NoError(t, err)
+
+ req.Header.Add("input", "sample")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "world", r.Header.Get("Header"))
+ assert.Equal(t, "SAMPLE", string(b))
+}
+
+func TestServer_Cookies(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php cookie pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8079", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("GET", "http://localhost:8079", nil)
+ assert.NoError(t, err)
+
+ req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"})
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "INPUT-VALUE", string(b))
+
+ for _, c := range r.Cookies() {
+ assert.Equal(t, "output", c.Name)
+ assert.Equal(t, "cookie-output", c.Value)
+ }
+}
+
+func TestServer_JsonPayload_POST(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8090", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest(
+ "POST",
+ "http://localhost"+hs.Addr,
+ bytes.NewBufferString(`{"key":"value"}`),
+ )
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestServer_JsonPayload_PUT(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8081", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestServer_JsonPayload_PATCH(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8082", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestServer_FormData_POST(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8083", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_FormData_PUT(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8084", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_FormData_PATCH(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8085", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_Multipart_POST(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8019", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ w.WriteField("key", "value")
+
+ w.WriteField("key", "value")
+ w.WriteField("name[]", "name1")
+ w.WriteField("name[]", "name2")
+ w.WriteField("name[]", "name3")
+ w.WriteField("arr[x][y][z]", "y")
+ w.WriteField("arr[x][y][e]", "f")
+ w.WriteField("arr[c]p", "l")
+ w.WriteField("arr[c]z", "")
+
+ w.Close()
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_Multipart_PUT(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8020", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ w.WriteField("key", "value")
+
+ w.WriteField("key", "value")
+ w.WriteField("name[]", "name1")
+ w.WriteField("name[]", "name2")
+ w.WriteField("name[]", "name3")
+ w.WriteField("arr[x][y][z]", "y")
+ w.WriteField("arr[x][y][e]", "f")
+ w.WriteField("arr[c]p", "l")
+ w.WriteField("arr[c]z", "")
+
+ w.Close()
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_Multipart_PATCH(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ w.WriteField("key", "value")
+
+ w.WriteField("key", "value")
+ w.WriteField("name[]", "name1")
+ w.WriteField("name[]", "name2")
+ w.WriteField("name[]", "name3")
+ w.WriteField("arr[x][y][z]", "y")
+ w.WriteField("arr[x][y][e]", "f")
+ w.WriteField("arr[c]p", "l")
+ w.WriteField("arr[c]z", "")
+
+ w.Close()
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestServer_Error(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php error pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ _, r, err := get("http://localhost:8077/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestServer_Error2(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php error2 pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ _, r, err := get("http://localhost:8077/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestServer_Error3(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ b2 := &bytes.Buffer{}
+ for i := 0; i < 1024*1024; i++ {
+ b2.Write([]byte(" "))
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func BenchmarkHandler_Listen_Echo(b *testing.B) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ st.rr.Start()
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ bb := "WORLD"
+ for n := 0; n < b.N; n++ {
+ r, err := http.Get("http://localhost:8077/?hello=world")
+ if err != nil {
+ b.Fail()
+ }
+ defer r.Body.Close()
+
+ br, _ := ioutil.ReadAll(r.Body)
+ if string(br) != bb {
+ b.Fail()
+ }
+ }
+}
diff --git a/service/http/parse.go b/service/http/parse.go
new file mode 100644
index 00000000..1f90930a
--- /dev/null
+++ b/service/http/parse.go
@@ -0,0 +1,147 @@
+package http
+
+import (
+ "net/http"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) dataTree {
+ data := make(dataTree)
+ if r.PostForm != nil {
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+ }
+
+ if r.MultipartForm != nil {
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+ }
+
+ return data
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
+func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// fetchIndexes parses input name and splits it into separate indexes list.
+func fetchIndexes(s string) []string {
+ var (
+ pos int
+ ch string
+ keys = make([]string, 1)
+ )
+
+ for _, c := range s {
+ ch = string(c)
+ switch ch {
+ case " ":
+ // ignore all spaces
+ continue
+ case "[":
+ pos = 1
+ continue
+ case "]":
+ if pos == 1 {
+ keys = append(keys, "")
+ }
+ pos = 2
+ default:
+ if pos == 1 || pos == 2 {
+ keys = append(keys, "")
+ }
+
+ keys[len(keys)-1] += ch
+ pos = 0
+ }
+ }
+
+ return keys
+}
diff --git a/service/http/parse_test.go b/service/http/parse_test.go
new file mode 100644
index 00000000..34c0dc0d
--- /dev/null
+++ b/service/http/parse_test.go
@@ -0,0 +1,54 @@
+package http
+
+import "testing"
+
+var samples = []struct {
+ in string
+ out []string
+}{
+ {"key", []string{"key"}},
+ {"key[subkey]", []string{"key", "subkey"}},
+ {"key[subkey]value", []string{"key", "subkey", "value"}},
+ {"key[subkey][value]", []string{"key", "subkey", "value"}},
+ {"key[subkey][value][]", []string{"key", "subkey", "value", ""}},
+ {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}},
+ {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}},
+}
+
+func Test_FetchIndexes(t *testing.T) {
+ for _, tt := range samples {
+ t.Run(tt.in, func(t *testing.T) {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ t.Errorf("got %q, want %q", r, tt.out)
+ }
+ })
+ }
+}
+
+func BenchmarkConfig_FetchIndexes(b *testing.B) {
+ for _, tt := range samples {
+ for n := 0; n < b.N; n++ {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ b.Fail()
+ }
+ }
+ }
+}
+
+func same(in, out []string) bool {
+ if len(in) != len(out) {
+ return false
+ }
+
+ for i, v := range in {
+ if v != out[i] {
+ return false
+ }
+ }
+
+ return true
+}
+
+// bench
diff --git a/service/http/request.go b/service/http/request.go
new file mode 100644
index 00000000..e02d3cdb
--- /dev/null
+++ b/service/http/request.go
@@ -0,0 +1,155 @@
+package http
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "io/ioutil"
+ "net/http"
+ "strings"
+)
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+ contentNone = iota + 900
+ contentStream
+ contentMultipart
+ contentFormData
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // Uri contains full request Uri with scheme and query.
+ Uri string `json:"uri"`
+
+ // Headers contains list of request headers.
+ Headers http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
+ req = &Request{
+ Protocol: r.Proto,
+ Method: r.Method,
+ Uri: uri(r),
+ Headers: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ }
+
+ for _, c := range r.Cookies() {
+ req.Cookies[c.Name] = c.Value
+ }
+
+ switch req.contentType() {
+ case contentNone:
+ return req, nil
+
+ case contentStream:
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+
+ case contentMultipart:
+ if err = r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ req.Uploads = parseUploads(r, cfg)
+ fallthrough
+ case contentFormData:
+ if err = r.ParseForm(); err != nil {
+ return nil, err
+ }
+
+ req.body = parseData(r)
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open() error {
+ if r.Uploads == nil {
+ return nil
+ }
+
+ return r.Uploads.Open()
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close() {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear()
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. Default encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (p *roadrunner.Payload, err error) {
+ p = &roadrunner.Payload{}
+
+ if p.Context, err = json.Marshal(r); err != nil {
+ return nil, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return nil, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// contentType returns the payload content type.
+func (r *Request) contentType() int {
+ if r.Method != "POST" && r.Method != "PUT" && r.Method != "PATCH" {
+ return contentNone
+ }
+
+ ct := r.Headers.Get("content-type")
+ if ct == "application/x-www-form-urlencoded" {
+ return contentFormData
+ }
+
+ if strings.Contains(ct, "multipart/form-data") {
+ return contentMultipart
+ }
+
+ return contentStream
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/service/http/response.go b/service/http/response.go
new file mode 100644
index 00000000..69bcf3e1
--- /dev/null
+++ b/service/http/response.go
@@ -0,0 +1,53 @@
+package http
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner"
+ "net/http"
+ "io"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Headers contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated body payload.
+ body interface{}
+}
+
+// NewResponse creates new response based on given roadrunner payload.
+func NewResponse(p *roadrunner.Payload) (*Response, error) {
+ r := &Response{body: p.Body}
+ if err := json.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ for k, v := range r.Headers {
+ for _, h := range v {
+ w.Header().Add(k, h)
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.body.([]byte); ok {
+ w.Write(data)
+ }
+
+ if rc, ok := r.body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/service/http/response_test.go b/service/http/response_test.go
new file mode 100644
index 00000000..e45f5349
--- /dev/null
+++ b/service/http/response_test.go
@@ -0,0 +1,92 @@
+package http
+
+import (
+ "net/http"
+ "bytes"
+ "testing"
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+ "errors"
+)
+
+type testWriter struct {
+ h http.Header
+ buf bytes.Buffer
+ wroteHeader bool
+ code int
+ err error
+}
+
+func (tw *testWriter) Header() http.Header { return tw.h }
+
+func (tw *testWriter) Write(p []byte) (int, error) {
+ if !tw.wroteHeader {
+ tw.WriteHeader(http.StatusOK)
+ }
+
+ n, e := tw.buf.Write(p)
+ if e == nil {
+ e = tw.err
+ }
+
+ return n, e
+}
+
+func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code }
+
+func TestNewResponse_Error(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)})
+ assert.Error(t, err)
+ assert.Nil(t, r)
+}
+
+func TestNewResponse_Write(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ Body: []byte(`sample body`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "sample body", w.buf.String())
+}
+
+func TestNewResponse_Stream(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "hello world", w.buf.String())
+}
+
+func TestNewResponse_StreamError(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")}
+ assert.Error(t, r.Write(w))
+}
diff --git a/service/http/rpc.go b/service/http/rpc.go
new file mode 100644
index 00000000..aebc5903
--- /dev/null
+++ b/service/http/rpc.go
@@ -0,0 +1,57 @@
+package http
+
+import (
+ "github.com/pkg/errors"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []Worker `json:"workers"`
+}
+
+// Worker provides information about specific worker.
+type Worker struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumJobs int64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, r *string) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ *r = "OK"
+ return rpc.svc.rr.Reset()
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
+ if rpc.svc.srv == nil {
+ return errors.New("http server is not running")
+ }
+
+ for _, w := range rpc.svc.rr.Workers() {
+ state := w.State()
+ r.Workers = append(r.Workers, Worker{
+ Pid: *w.Pid,
+ Status: state.String(),
+ NumJobs: state.NumExecs(),
+ Created: w.Created.UnixNano(),
+ })
+ }
+
+ return nil
+}
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
new file mode 100644
index 00000000..f1c5786c
--- /dev/null
+++ b/service/http/rpc_test.go
@@ -0,0 +1,115 @@
+package http
+
+import (
+ "testing"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "time"
+ "github.com/spiral/roadrunner/service/rpc"
+ "strconv"
+)
+
+func Test_RPC(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ res, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := ""
+ assert.NoError(t, cl.Call("http.Reset", true, &r))
+ assert.Equal(t, "OK", r)
+
+ res2, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
+ assert.NotEqual(t, res, res2)
+}
+
+func Test_Workers(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := &WorkerList{}
+ assert.NoError(t, cl.Call("http.Workers", true, &r))
+ assert.Len(t, r.Workers, 1)
+
+ assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid)
+} \ No newline at end of file
diff --git a/service/http/service.go b/service/http/service.go
new file mode 100644
index 00000000..8485cba6
--- /dev/null
+++ b/service/http/service.go
@@ -0,0 +1,128 @@
+package http
+
+import (
+ "net/http"
+ "github.com/spiral/roadrunner/service"
+ "context"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/rpc"
+ "sync"
+)
+
+// ID contains default svc name.
+const ID = "http"
+
+// must return true if request/response pair is handled withing the middleware.
+type middleware func(w http.ResponseWriter, r *http.Request) bool
+
+// Service manages rr, http servers.
+type Service struct {
+ cfg *Config
+ lsns []func(event int, ctx interface{})
+ mdws []middleware
+
+ mu sync.Mutex
+ rr *roadrunner.Server
+ srv *Handler
+ http *http.Server
+}
+
+func (s *Service) AddMiddleware(m middleware) {
+ s.mdws = append(s.mdws, m)
+}
+
+// AddListener attaches server event watcher.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Init(cfg service.Config, c service.Container) (bool, error) {
+ config := &Config{}
+
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+
+ // registering http RPC interface
+ if r, ok := c.Get(rpc.ID); ok >= service.StatusConfigured {
+ if h, ok := r.(*rpc.Service); ok {
+ h.Register(ID, &rpcServer{s})
+ }
+ }
+
+ return true, nil
+}
+
+// Serve serves the svc.
+func (s *Service) Serve() error {
+ s.mu.Lock()
+ rr := roadrunner.NewServer(s.cfg.Workers)
+
+ s.rr = rr
+ s.srv = &Handler{cfg: s.cfg, rr: s.rr}
+ s.http = &http.Server{Addr: s.cfg.Address}
+
+ s.rr.Listen(s.listener)
+ s.srv.Listen(s.listener)
+
+ if len(s.mdws) == 0 {
+ s.http.Handler = s.srv
+ } else {
+ s.http.Handler = s
+ }
+ s.mu.Unlock()
+
+ if err := rr.Start(); err != nil {
+ return err
+ }
+ defer s.rr.Stop()
+
+ return s.http.ListenAndServe()
+}
+
+// Stop stops the svc.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.http == nil {
+ return
+ }
+
+ s.http.Shutdown(context.Background())
+}
+
+// middleware handles connection using set of mdws and rr PSR-7 server.
+func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ for _, m := range s.mdws {
+ if m(w, r) {
+ return
+ }
+ }
+
+ s.srv.ServeHTTP(w, r)
+}
+
+func (s *Service) listener(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+
+ if event == roadrunner.EventServerFailure {
+ // attempting rr server restart
+ if err := s.rr.Start(); err != nil {
+ s.Stop()
+ }
+ }
+}
diff --git a/service/http/service_test.go b/service/http/service_test.go
new file mode 100644
index 00000000..ebcd0f5a
--- /dev/null
+++ b/service/http/service_test.go
@@ -0,0 +1,400 @@
+package http
+
+import (
+ "testing"
+ "github.com/spiral/roadrunner/service"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/sirupsen/logrus"
+ "encoding/json"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "time"
+ "net/http"
+ "io/ioutil"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/rpc"
+)
+
+type testCfg struct {
+ httpCfg string
+ rpcCfg string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.httpCfg}
+ }
+
+ if name == rpc.ID {
+ return &testCfg{target: cfg.rpcCfg}
+ }
+ return nil
+}
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ return json.Unmarshal([]byte(cfg.target), out)
+}
+
+func Test_Service_NoConfig(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{}`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusRegistered, st)
+}
+
+func Test_Service_Configure_Disable(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": false,
+ "address": ":8070",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusRegistered, st)
+}
+
+func Test_Service_Configure_Enable(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":8070",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+}
+
+func Test_Service_Echo(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ // should do nothing
+ s.Stop()
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+}
+
+func Test_Service_Middleware(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ s.(*Service).AddMiddleware(func(w http.ResponseWriter, r *http.Request) bool {
+ if r.URL.Path == "/halt" {
+ w.WriteHeader(500)
+ w.Write([]byte("halted"))
+ return true
+ }
+
+ return false
+ })
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ req, err = http.NewRequest("GET", "http://localhost:6029/halt", nil)
+ assert.NoError(t, err)
+
+ r, err = http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err = ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+ assert.Equal(t, "halted", string(b))
+}
+
+func Test_Service_Listener(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ stop := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventServerStart {
+ stop <- nil
+ }
+ })
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+
+ c.Stop()
+ assert.True(t, true)
+}
+
+func Test_Service_Error(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "---",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ assert.Error(t, c.Serve())
+}
+
+func Test_Service_Error2(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ assert.Error(t, c.Serve())
+}
+
+func Test_Service_Error3(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.Error(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers"
+ "command": "php ../../php-src/tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+}
+
+func Test_Service_Error4(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.Error(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": "----",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+}
+
+func tmpDir() string {
+ p := os.TempDir()
+ r, _ := json.Marshal(p)
+
+ return string(r)
+}
diff --git a/service/http/uploads.go b/service/http/uploads.go
new file mode 100644
index 00000000..f8334c30
--- /dev/null
+++ b/service/http/uploads.go
@@ -0,0 +1,140 @@
+package http
+
+import (
+ "encoding/json"
+ "os"
+ "sync"
+ "mime/multipart"
+ "io/ioutil"
+ "io"
+)
+
+const (
+ // There is no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // No file was uploaded.
+ UploadErrorNoFile = 4
+
+ // Missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // Failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // Forbid file extension.
+ UploadErrorExtension = 7
+)
+
+// tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg *UploadsConfig
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ return json.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open() error {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ f.Open(u.cfg)
+ }(f)
+ }
+
+ wg.Wait()
+ return nil
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear() {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ os.Remove(f.TempFilename)
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // ID contains filename specified by the client.
+ Name string `json:"name"`
+
+ // Mime contains mime-type provided by the client.
+ Mime string `json:"mime"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ Mime: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+func (f *FileUpload) Open(cfg *UploadsConfig) error {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+ defer file.Close()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer tmp.Close()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true
+ }
+
+ return false
+}
diff --git a/service/http/uploads_config.go b/service/http/uploads_config.go
new file mode 100644
index 00000000..715de69a
--- /dev/null
+++ b/service/http/uploads_config.go
@@ -0,0 +1,39 @@
+package http
+
+import (
+ "strings"
+ "path"
+ "os"
+)
+
+// UploadsConfig describes file location and controls access to them.
+type UploadsConfig struct {
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// TmpDir returns temporary directory.
+func (cfg *UploadsConfig) TmpDir() string {
+ if cfg.Dir != "" {
+ return cfg.Dir
+ }
+
+ return os.TempDir()
+}
+
+// Forbid must return true if file extension is not allowed for the upload.
+func (cfg *UploadsConfig) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/service/http/uploads_config_test.go b/service/http/uploads_config_test.go
new file mode 100644
index 00000000..7704a486
--- /dev/null
+++ b/service/http/uploads_config_test.go
@@ -0,0 +1,24 @@
+package http
+
+import (
+ "testing"
+ "github.com/stretchr/testify/assert"
+ "os"
+)
+
+func TestFsConfig_Forbids(t *testing.T) {
+ cfg := UploadsConfig{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestFsConfig_TmpFallback(t *testing.T) {
+ cfg := UploadsConfig{Dir: "test"}
+ assert.Equal(t, "test", cfg.TmpDir())
+
+ cfg = UploadsConfig{Dir: ""}
+ assert.Equal(t, os.TempDir(), cfg.TmpDir())
+}
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
new file mode 100644
index 00000000..af351067
--- /dev/null
+++ b/service/http/uploads_test.go
@@ -0,0 +1,311 @@
+package http
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+ "net/http"
+ "time"
+ "bytes"
+ "mime/multipart"
+ "io/ioutil"
+ "testing"
+ "os"
+ "context"
+ "io"
+ "encoding/json"
+ "crypto/md5"
+ "encoding/hex"
+)
+
+func TestServer_Upload_File(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer f.Close()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ io.Copy(fw, f)
+
+ w.Close()
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 0, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+func TestServer_Upload_NestedFile(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer f.Close()
+ fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ io.Copy(fw, f)
+
+ w.Close()
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 0, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
+}
+
+
+func TestServer_Upload_File_NoTmpDir(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: "-----",
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer f.Close()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ io.Copy(fw, f)
+
+ w.Close()
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 5, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+
+func TestServer_Upload_File_Forbids(t *testing.T) {
+ st := &Handler{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer f.Close()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ io.Copy(fw, f)
+
+ w.Close()
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 7, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+func Test_FileExists(t *testing.T) {
+ assert.True(t, exists("uploads_test.go"))
+ assert.False(t, exists("uploads_test."))
+}
+
+func mustOpen(f string) *os.File {
+ r, err := os.Open(f)
+ if err != nil {
+ panic(err)
+ }
+ return r
+}
+
+type fInfo struct {
+ Name string `json:"name"`
+ Size int64 `json:"size"`
+ Mime string `json:"mime"`
+ Error int `json:"error"`
+ MD5 string `json:"md5,omitempty"`
+}
+
+func fileString(f string, err int, mime string) string {
+ s, _ := os.Stat(f)
+
+ ff, _ := os.Open(f)
+ defer ff.Close()
+ h := md5.New()
+ io.Copy(h, ff)
+
+ v := &fInfo{
+ Name: s.Name(),
+ Size: s.Size(),
+ Error: err,
+ Mime: mime,
+ MD5: hex.EncodeToString(h.Sum(nil)),
+ }
+
+ if err != 0 {
+ v.MD5 = ""
+ v.Size = 0
+ }
+
+ r, _ := json.Marshal(v)
+ return string(r)
+
+}
diff --git a/service/rpc/config.go b/service/rpc/config.go
new file mode 100644
index 00000000..06d63d65
--- /dev/null
+++ b/service/rpc/config.go
@@ -0,0 +1,35 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+)
+
+type config struct {
+ // Indicates if RPC connection is enabled.
+ Enable bool
+
+ // AddListener string
+ Listen string
+}
+
+// listener creates new rpc socket listener.
+func (cfg *config) listener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+// dialer creates rpc socket dialer.
+func (cfg *config) dialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
new file mode 100644
index 00000000..a953e30e
--- /dev/null
+++ b/service/rpc/config_test.go
@@ -0,0 +1,109 @@
+package rpc
+
+import (
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+)
+
+func TestConfig_Listener(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "tcp", ln.Addr().Network())
+ assert.Equal(t, "[::]:18001", ln.Addr().String())
+}
+
+func TestConfig_ListenerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer ln.Close()
+
+ assert.Equal(t, "unix", ln.Addr().Network())
+ assert.Equal(t, "rpc.sock", ln.Addr().String())
+}
+
+func Test_Config_Error(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_ErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func TestConfig_Dialer(t *testing.T) {
+ cfg := &config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "tcp", conn.RemoteAddr().Network())
+ assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
+}
+
+func TestConfig_DialerUnix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "unix://rpc.sock"}
+
+ ln, err := cfg.listener()
+ defer ln.Close()
+
+ conn, err := cfg.dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer conn.Close()
+
+ assert.Equal(t, "unix", conn.RemoteAddr().Network())
+ assert.Equal(t, "rpc.sock", conn.RemoteAddr().String())
+}
+
+func Test_Config_DialerError(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ cfg := &config{Listen: "uni:unix.sock"}
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
+}
+
+func Test_Config_DialerErrorMethod(t *testing.T) {
+ cfg := &config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
diff --git a/service/rpc/service.go b/service/rpc/service.go
new file mode 100644
index 00000000..e1147754
--- /dev/null
+++ b/service/rpc/service.go
@@ -0,0 +1,122 @@
+package rpc
+
+import (
+ "errors"
+ "github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
+ "net/rpc"
+ "sync"
+)
+
+// ID contains default service name.
+const ID = "rpc"
+
+// Service is RPC service.
+type Service struct {
+ cfg *config
+ stop chan interface{}
+ rpc *rpc.Server
+
+ mu sync.Mutex
+ serving bool
+}
+
+// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) {
+ config := &config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ s.cfg = config
+ s.rpc = rpc.NewServer()
+
+ return true, nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ s.mu.Lock()
+ s.serving = true
+ s.stop = make(chan interface{})
+ s.mu.Unlock()
+
+ ln, err := s.cfg.listener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ go func() {
+ for {
+ select {
+ case <-s.stop:
+ break
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ <-s.stop
+
+ s.mu.Lock()
+ s.serving = false
+ s.mu.Unlock()
+
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.serving {
+ close(s.stop)
+ }
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, rcvr interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, rcvr)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.cfg == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.cfg.dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
new file mode 100644
index 00000000..ce85d52f
--- /dev/null
+++ b/service/rpc/service_test.go
@@ -0,0 +1,97 @@
+package rpc
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+type testService struct{}
+
+func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) service.Config { return nil }
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_ConfigError(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":false`}, nil)
+
+ assert.Error(t, err)
+ assert.False(t, ok)
+}
+
+func Test_Disabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":false}`}, nil)
+
+ assert.NoError(t, err)
+ assert.False(t, ok)
+}
+
+func Test_RegisterNotConfigured(t *testing.T) {
+ s := &Service{}
+ assert.Error(t, s.Register("test", &testService{}))
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+ assert.Error(t, s.Serve())
+}
+
+func Test_Enabled(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+}
+
+func Test_StopNonServing(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+
+ assert.NoError(t, err)
+ assert.True(t, ok)
+ s.Stop()
+}
+
+func Test_Serve_Errors(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ assert.Error(t, s.Serve())
+
+ client, err := s.Client()
+ assert.Nil(t, client)
+ assert.Error(t, err)
+}
+
+func Test_Serve_Client(t *testing.T) {
+ s := &Service{}
+ ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil)
+ assert.NoError(t, err)
+ assert.True(t, ok)
+
+ defer s.Stop()
+
+ assert.NoError(t, s.Register("test", &testService{}))
+
+ go func() { assert.NoError(t, s.Serve()) }()
+
+ time.Sleep(time.Millisecond)
+ client, err := s.Client()
+ assert.NotNil(t, client)
+ assert.NoError(t, err)
+ defer client.Close()
+
+ var resp string
+ assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
+ assert.Equal(t, "hello world", resp)
+}
diff --git a/service/service.go b/service/service.go
new file mode 100644
index 00000000..6ddcda41
--- /dev/null
+++ b/service/service.go
@@ -0,0 +1,61 @@
+package service
+
+import "sync"
+
+// svc provides high level functionality for road runner svc.
+type Service interface {
+ // Init must return configure service and return true if service hasStatus enabled. Must return error in case of
+ // misconfiguration. Services must not be used without proper configuration pushed first.
+ Init(cfg Config, c Container) (enabled bool, err error)
+
+ // Serve serves.
+ Serve() error
+
+ // Stop stops the service.
+ Stop()
+}
+
+const (
+ // StatusUndefined when service bus can not find the service.
+ StatusUndefined = iota
+
+ // StatusRegistered hasStatus setStatus when service has been registered in container.
+ StatusRegistered
+
+ // StatusConfigured hasStatus setStatus when service has been properly configured.
+ StatusConfigured
+
+ // StatusServing hasStatus setStatus when service hasStatus currently done.
+ StatusServing
+
+ // StatusStopped hasStatus setStatus when service hasStatus stopped.
+ StatusStopped
+)
+
+// entry creates association between service instance and given name.
+type entry struct {
+ name string
+ svc Service
+ mu sync.Mutex
+ status int
+}
+
+// status returns service status
+func (e *entry) getStatus() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ return e.status
+}
+
+// setStarted indicates that service hasStatus status.
+func (e *entry) setStatus(status int) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.status = status
+}
+
+// hasStatus checks if entry in specific status
+func (e *entry) hasStatus(status int) bool {
+ return e.getStatus() == status
+}
diff --git a/service/static/config.go b/service/static/config.go
new file mode 100644
index 00000000..2a1f6c13
--- /dev/null
+++ b/service/static/config.go
@@ -0,0 +1,52 @@
+package static
+
+import (
+ "strings"
+ "path"
+ "os"
+ "github.com/pkg/errors"
+)
+
+// Config describes file location and controls access to them.
+type Config struct {
+ // Enables StaticFile service.
+ Enable bool
+
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// Forbid must return true if file extension is not allowed for the upload.
+func (cfg *Config) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Valid validates existence of directory.
+func (cfg *Config) Valid() error {
+ st, err := os.Stat(cfg.Dir)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return errors.New("root directory does not exists")
+ }
+
+ return err
+ }
+
+ if !st.IsDir() {
+ return errors.New("invalid root directory")
+ }
+
+ return nil
+}
diff --git a/service/static/config_test.go b/service/static/config_test.go
new file mode 100644
index 00000000..ce31348a
--- /dev/null
+++ b/service/static/config_test.go
@@ -0,0 +1,21 @@
+package static
+
+import (
+ "testing"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestConfig_Forbids(t *testing.T) {
+ cfg := Config{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestConfig_Valid(t *testing.T) {
+ assert.NoError(t, (&Config{Dir: "./"}).Valid())
+ assert.Error(t, (&Config{Dir: "./config.go"}).Valid())
+ assert.Error(t, (&Config{Dir: "./dir/"}).Valid())
+}
diff --git a/service/static/service.go b/service/static/service.go
new file mode 100644
index 00000000..43891aa8
--- /dev/null
+++ b/service/static/service.go
@@ -0,0 +1,89 @@
+package static
+
+import (
+ "net/http"
+ "path"
+ "strings"
+ rrttp "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner/service"
+)
+
+// ID contains default service name.
+const ID = "static"
+
+// Service serves static files. Potentially convert into middleware?
+type Service struct {
+ // server configuration (location, forbidden files and etc)
+ cfg *Config
+
+ // root is initiated http directory
+ root http.Dir
+}
+
+// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Init(cfg service.Config, c service.Container) (enabled bool, err error) {
+ config := &Config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enable {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+ s.root = http.Dir(s.cfg.Dir)
+
+ // registering as middleware
+ if h, ok := c.Get(rrttp.ID); ok >= service.StatusConfigured {
+ if h, ok := h.(*rrttp.Service); ok {
+ h.AddMiddleware(s.middleware)
+ }
+ }
+
+ return true, nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() error { return nil }
+
+// Stop stops the service.
+func (s *Service) Stop() {}
+
+// middleware must return true if request/response pair is handled withing the middleware.
+func (s *Service) middleware(w http.ResponseWriter, r *http.Request) bool {
+ fPath := r.URL.Path
+
+ if !strings.HasPrefix(fPath, "/") {
+ fPath = "/" + fPath
+ }
+ fPath = path.Clean(fPath)
+
+ if s.cfg.Forbids(fPath) {
+ return false
+ }
+
+ f, err := s.root.Open(fPath)
+ if err != nil {
+ return false
+ }
+ defer f.Close()
+
+ d, err := f.Stat()
+ if err != nil {
+ return false
+ }
+
+ // do not serve directories
+ if d.IsDir() {
+ return false
+ }
+
+ http.ServeContent(w, r, d.Name(), d.ModTime(), f)
+ return true
+}
diff --git a/service/static/service_test.go b/service/static/service_test.go
new file mode 100644
index 00000000..f0cb1bea
--- /dev/null
+++ b/service/static/service_test.go
@@ -0,0 +1,349 @@
+package static
+
+import (
+ "testing"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "time"
+ rrhttp "github.com/spiral/roadrunner/service/http"
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "bytes"
+ "io"
+)
+
+type testCfg struct {
+ httpCfg string
+ static string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == rrhttp.ID {
+ return &testCfg{target: cfg.httpCfg}
+ }
+
+ if name == ID {
+ return &testCfg{target: cfg.static}
+ }
+ return nil
+}
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ return json.Unmarshal([]byte(cfg.target), out)
+}
+
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ return string(b), r, err
+}
+
+func Test_Files(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/sample.txt")
+ assert.Equal(t, "sample", b)
+}
+
+
+func Test_Files_Disable(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":false, "dir":"../../php-src/tests", "forbid":[".php"]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/client.php?hello=world")
+ assert.Equal(t, "WORLD", b)
+}
+
+func Test_Files_Error(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.Error(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"dir/invalid", "forbid":[".php"]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+}
+
+func Test_Files_Error2(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.Error(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"dir/invalid", "forbid":[".php"]`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+}
+
+func Test_Files_Forbid(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/client.php?hello=world")
+ assert.Equal(t, "WORLD", b)
+}
+
+func Test_Files_NotFound(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/client.XXX?hello=world")
+ assert.Equal(t, "WORLD", b)
+}
+
+
+func Test_Files_Dir(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[".php"]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/http?hello=world")
+ assert.Equal(t, "WORLD", b)
+}
+
+func Test_Files_NotForbid(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ static: `{"enable":true, "dir":"../../php-src/tests", "forbid":[]}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ b, _, _ := get("http://localhost:6029/client.php")
+ assert.Equal(t, all("../../php-src/tests/client.php"), b)
+}
+
+func tmpDir() string {
+ p := os.TempDir()
+ r, _ := json.Marshal(p)
+
+ return string(r)
+}
+
+func all(fn string) string {
+ f, _ := os.Open(fn)
+ defer f.Close()
+
+ b := &bytes.Buffer{}
+ io.Copy(b, f)
+
+ return b.String()
+}
diff --git a/socket_factory.go b/socket_factory.go
index acdc91b1..43059e8a 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -25,7 +25,7 @@ type SocketFactory struct {
relays map[int]chan *goridge.SocketRelay
}
-// NewSocketFactory returns SocketFactory attached to a given socket listener.
+// NewSocketFactory returns SocketFactory attached to a given socket lsn.
// tout specifies for how long factory should serve for incoming relay connection
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
f := &SocketFactory{
@@ -54,7 +54,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
go func(w *Worker) { w.Kill() }(w)
if wErr := w.Wait(); wErr != nil {
- err = errors.Wrap(wErr, err.Error())
+ if _, ok := wErr.(*exec.ExitError); ok {
+ err = errors.Wrap(wErr, err.Error())
+ } else {
+ err = wErr
+ }
}
return nil, errors.Wrap(err, "unable to connect to worker")
@@ -66,6 +70,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
return w, nil
}
+// Close socket factory and underlying socket connection.
+func (f *SocketFactory) Close() error {
+ return f.ls.Close()
+}
+
// listens for incoming socket connections
func (f *SocketFactory) listen() {
for {
diff --git a/socket_factory_test.go b/socket_factory_test.go
index d0d643af..214a4851 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -32,6 +32,31 @@ func Test_Tcp_Start(t *testing.T) {
w.Stop()
}
+func Test_Tcp_StartCloseFactory(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
+
+ f := NewSocketFactory(ls, time.Minute)
+ defer f.Close()
+
+ w, err := f.SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ w.Stop()
+}
+
func Test_Tcp_StartError(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
@@ -42,7 +67,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
cmd.Start()
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
@@ -60,7 +85,7 @@ func Test_Tcp_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/failboot.php")
+ cmd := exec.Command("php", "php-src/tests/failboot.php")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -78,7 +103,7 @@ func Test_Tcp_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0")
+ cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "tcp", "200", "0")
w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -96,7 +121,7 @@ func Test_Tcp_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/invalid.php")
+ cmd := exec.Command("php", "php-src/tests/invalid.php")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.Error(t, err)
@@ -113,7 +138,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "php-src/tests/client.php", "broken", "tcp")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -140,7 +165,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -159,7 +184,7 @@ func Test_Tcp_Echo(t *testing.T) {
}
func Test_Unix_Start(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -170,7 +195,7 @@ func Test_Unix_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -184,7 +209,7 @@ func Test_Unix_Start(t *testing.T) {
}
func Test_Unix_Failboot(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -195,7 +220,7 @@ func Test_Unix_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/failboot.php")
+ cmd := exec.Command("php", "php-src/tests/failboot.php")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -204,7 +229,7 @@ func Test_Unix_Failboot(t *testing.T) {
}
func Test_Unix_Timeout(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -215,7 +240,7 @@ func Test_Unix_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -224,7 +249,7 @@ func Test_Unix_Timeout(t *testing.T) {
}
func Test_Unix_Invalid(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -235,7 +260,7 @@ func Test_Unix_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/invalid.php")
+ cmd := exec.Command("php", "php-src/tests/invalid.php")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.Error(t, err)
@@ -243,7 +268,7 @@ func Test_Unix_Invalid(t *testing.T) {
}
func Test_Unix_Broken(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -254,7 +279,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "php-src/tests/client.php", "broken", "unix")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -272,7 +297,7 @@ func Test_Unix_Broken(t *testing.T) {
}
func Test_Unix_Echo(t *testing.T) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
t.Skip("not supported on " + runtime.GOOS)
}
@@ -283,7 +308,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix")
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -311,7 +336,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
f := NewSocketFactory(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
w, _ := f.SpawnWorker(cmd)
go func() {
@@ -332,7 +357,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -348,7 +373,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
b.Skip("not supported on " + runtime.GOOS)
}
@@ -361,7 +386,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
f := NewSocketFactory(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix")
w, _ := f.SpawnWorker(cmd)
go func() {
@@ -375,7 +400,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
}
func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
- if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ if runtime.GOOS == "windows" {
b.Skip("not supported on " + runtime.GOOS)
}
@@ -386,7 +411,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "unix")
w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
diff --git a/state.go b/state.go
index d1068ab3..4d8b1eaa 100644
--- a/state.go
+++ b/state.go
@@ -2,9 +2,7 @@ package roadrunner
import (
"fmt"
- "sync"
"sync/atomic"
- "time"
)
// State represents worker status and updated time.
@@ -14,46 +12,53 @@ type State interface {
// Value returns state value
Value() int64
- // NumExecs shows how many times worker was invoked
- NumExecs() uint64
-
- // Updated indicates a moment updated last state change
- Updated() time.Time
+ // NumJobs shows how many times worker was invoked
+ NumExecs() int64
}
const (
// StateInactive - no associated process
StateInactive int64 = iota
+
// StateReady - ready for job.
StateReady
+
// StateWorking - working on given payload.
StateWorking
- // StateStopped - process has been terminated
+
+ // StateStreaming - indicates that worker is streaming the data at the moment.
+ StateStreaming
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateStopped - process has been terminated.
StateStopped
- // StateErrored - error state (can't be used)
+
+ // StateErrored - error state (can't be used).
StateErrored
)
type state struct {
- mu sync.RWMutex
value int64
- numExecs uint64
- updated time.Time
+ numExecs int64
}
func newState(value int64) *state {
- return &state{value: value, updated: time.Now()}
+ return &state{value: value}
}
// String returns current state as string.
func (s *state) String() string {
- switch s.value {
+ switch s.Value() {
case StateInactive:
return "inactive"
case StateReady:
return "ready"
case StateWorking:
return "working"
+ case StateStreaming:
+ return "streaming"
case StateStopped:
return "stopped"
case StateErrored:
@@ -63,12 +68,14 @@ func (s *state) String() string {
return "undefined"
}
+// NumExecs returns number of registered worker execs.
+func (s *state) NumExecs() int64 {
+ return atomic.LoadInt64(&s.numExecs)
+}
+
// Value state returns state value
func (s *state) Value() int64 {
- s.mu.RLock()
- defer s.mu.RUnlock()
-
- return s.value
+ return atomic.LoadInt64(&s.value)
}
// IsActive returns true if worker not Inactive or Stopped
@@ -77,28 +84,12 @@ func (s *state) IsActive() bool {
return state == StateWorking || state == StateReady
}
-// Updated indicates a moment updated last state change
-func (s *state) Updated() time.Time {
- s.mu.RLock()
- defer s.mu.RUnlock()
-
- return s.updated
-}
-
-func (s *state) NumExecs() uint64 {
- return atomic.LoadUint64(&s.numExecs)
-}
-
// change state value (status)
func (s *state) set(value int64) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- s.value = value
- s.updated = time.Now()
+ atomic.StoreInt64(&s.value, value)
}
// register new execution atomically
func (s *state) registerExec() {
- atomic.AddUint64(&s.numExecs, 1)
+ atomic.AddInt64(&s.numExecs, 1)
}
diff --git a/state_test.go b/state_test.go
index be63230e..c13c5a88 100644
--- a/state_test.go
+++ b/state_test.go
@@ -9,7 +9,6 @@ func Test_NewState(t *testing.T) {
st := newState(StateErrored)
assert.Equal(t, "errored", st.String())
- assert.NotEqual(t, 0, st.Updated().Unix())
assert.Equal(t, "inactive", newState(StateInactive).String())
assert.Equal(t, "ready", newState(StateReady).String())
diff --git a/static_pool.go b/static_pool.go
index c4895bf0..9449ea0c 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,6 +6,7 @@ import (
"os/exec"
"sync"
"time"
+ "sync/atomic"
)
const (
@@ -21,9 +22,6 @@ type StaticPool struct {
// worker command creator
cmd func() *exec.Cmd
- // observer is optional callback to handle worker create/destruct/error events.
- observer func(event int, w *Worker, ctx interface{})
-
// creates and connects to workers
factory Factory
@@ -33,17 +31,27 @@ type StaticPool struct {
// workers circular allocation buffer
free chan *Worker
+ // number of workers expected to be dead in a buffer.
+ numDead int64
+
// protects state of worker list, does not affect allocation
muw sync.RWMutex
// all registered workers
workers []*Worker
+
+ // pool is being destroying
+ inDestroy int32
+
+ // lsn is optional callback to handle worker create/destruct/error events.
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config error")
+ return nil, errors.Wrap(err, "config")
}
p := &StaticPool{
@@ -55,10 +63,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
}
// constant number of workers simplify logic
- for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ for i := int64(0); i < p.cfg.NumWorkers; i++ {
// to test if worker ready
w, err := p.createWorker()
-
if err != nil {
p.Destroy()
return nil, err
@@ -70,9 +77,12 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Report attaches pool event watcher.
-func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) {
- p.observer = o
+// AddListener attaches pool event watcher.
+func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
+ p.mul.Lock()
+ defer p.mul.Unlock()
+
+ p.lsn = l
}
// Config returns associated pool configuration. Immutable.
@@ -107,40 +117,38 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
if err != nil {
// soft job errors are allowed
if _, jobError := err.(JobError); jobError {
- p.free <- w
+ p.release(w)
return nil, err
}
- go p.replaceWorker(w, err)
+ go p.destroyWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.replaceWorker(w, err)
+ go p.destroyWorker(w, err)
return p.Exec(rqs)
}
- if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
- go p.replaceWorker(w, p.cfg.MaxExecutions)
- } else {
- p.free <- w
- }
-
+ p.release(w)
return rsp, nil
}
// Destroy all underlying workers (but let them to complete the task).
func (p *StaticPool) Destroy() {
+ atomic.AddInt32(&p.inDestroy, 1)
+ defer atomic.AddInt32(&p.inDestroy, -1)
+
p.tasks.Wait()
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
+ go w.Stop()
go func(w *Worker) {
defer wg.Done()
-
- p.destroyWorker(w)
+ p.destroyWorker(w, nil)
}(w)
}
@@ -149,93 +157,134 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
+ for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ {
+ // this loop is required to skip issues with dead workers still being in a ring.
+ select {
+ case w = <-p.free:
+ if w.State().Value() != StateReady {
+ atomic.AddInt64(&p.numDead, ^int64(0))
+ continue
+ }
+
+ return w, nil
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w = <-p.free:
+ timeout.Stop()
+
+ if w.State().Value() != StateReady {
+ atomic.AddInt64(&p.numDead, ^int64(0))
+ continue
+ }
+ return w, nil
+ }
}
+
+ return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers)
}
-// replaceWorker replaces dead or expired worker with new instance.
-func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
- go p.destroyWorker(w)
+// release releases or replaces the worker.
+func (p *StaticPool) release(w *Worker) {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ go p.destroyWorker(w, p.cfg.MaxJobs)
+ return
+ }
- if nw, err := p.createWorker(); err != nil {
- p.throw(EventError, w, err)
+ p.free <- w
+}
- if len(p.Workers()) == 0 {
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- p.throw(EventError, nil, fmt.Errorf("all workers dead"))
- }
- } else {
- p.free <- nw
+// creates new worker using associated factory. automatically
+// adds worker to the worker list (background)
+func (p *StaticPool) createWorker() (*Worker, error) {
+ w, err := p.factory.SpawnWorker(p.cmd())
+ if err != nil {
+ return nil, err
}
-}
-// destroyWorker destroys workers and removes it from the pool.
-func (p *StaticPool) destroyWorker(w *Worker) {
- p.throw(EventDestruct, w, nil)
+ p.throw(EventWorkerConstruct, w)
- // detaching
p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = p.workers[:i+1]
- break
- }
- }
+ p.workers = append(p.workers, w)
p.muw.Unlock()
+ go p.watchWorker(w)
+ return w, nil
+}
+
+// destroyWorker destroys workers and removes it from the pool.
+func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
go w.Stop()
select {
case <-w.waitDone:
// worker is dead
+ p.throw(EventWorkerDestruct, w)
+
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process
+ // failed to stop process in given time
if err := w.Kill(); err != nil {
- p.throw(EventError, w, err)
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
}
+
+ p.throw(EventWorkerKill, w)
}
}
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *StaticPool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
- if err != nil {
- return nil, err
+// watchWorker watches worker state and replaces it if worker fails.
+func (p *StaticPool) watchWorker(w *Worker) {
+ err := w.Wait()
+ p.throw(EventWorkerDead, w)
+
+ // detaching
+ p.muw.Lock()
+ for i, wc := range p.workers {
+ if wc == w {
+ p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ break
+ }
}
+ p.muw.Unlock()
- p.throw(EventCreated, w, nil)
+ // registering a dead worker
+ atomic.AddInt64(&p.numDead, 1)
- go func(w *Worker) {
- if err := w.Wait(); err != nil {
- p.throw(EventError, w, err)
- }
- }(w)
+ // worker have died unexpectedly, pool should attempt to replace it with alive version safely
+ if err != nil {
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
- p.muw.Lock()
- defer p.muw.Unlock()
+ if !p.destroying() {
+ nw, err := p.createWorker()
+ if err == nil {
+ p.free <- nw
+ return
+ }
- p.workers = append(p.workers, w)
+ // possible situation when major error causes all PHP scripts to die (for example dead DB)
+ if len(p.Workers()) == 0 {
+ p.throw(EventPoolError, err)
+ } else {
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
+ }
+}
- return w, nil
+func (p *StaticPool) destroying() bool {
+ return atomic.LoadInt32(&p.inDestroy) != 0
}
// throw invokes event handler if any.
-func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) {
- if p.observer != nil {
- p.observer(event, w, ctx)
+func (p *StaticPool) throw(event int, ctx interface{}) {
+ p.mul.Lock()
+ defer p.mul.Unlock()
+
+ if p.lsn != nil {
+ p.lsn(event, ctx)
}
}
diff --git a/static_pool_test.go b/static_pool_test.go
index b4069b98..231bc71b 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -2,24 +2,24 @@ package roadrunner
import (
"github.com/stretchr/testify/assert"
- "log"
"os/exec"
"runtime"
"strconv"
- "sync"
"testing"
"time"
+ "sync"
+ "log"
)
var cfg = Config{
- NumWorkers: uint64(runtime.NumCPU()),
+ NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
}
func Test_NewPool(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -33,7 +33,7 @@ func Test_NewPool(t *testing.T) {
func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/invalid.php") },
NewPipeFactory(),
cfg,
)
@@ -44,7 +44,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
func Test_ConfigError(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
Config{
AllocateTimeout: time.Second,
@@ -58,7 +58,7 @@ func Test_ConfigError(t *testing.T) {
func Test_StaticPool_Echo(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -79,7 +79,7 @@ func Test_StaticPool_Echo(t *testing.T) {
func Test_StaticPool_Echo_NilContext(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -100,7 +100,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
func Test_StaticPool_Echo_Context(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "head", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -121,7 +121,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
func Test_StaticPool_JobError(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "error", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -141,7 +141,7 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "broken", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- p.Report(func(e int, w *Worker, ctx interface{}) {
+ p.Listen(func(e int, ctx interface{}) {
if err, ok := ctx.(error); ok {
assert.Contains(t, err.Error(), "undefined_function()")
}
@@ -162,9 +162,46 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Nil(t, res)
}
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
+
+ destructed := make(chan interface{})
+ p.Listen(func(e int, ctx interface{}) {
+ if e == EventWorkerConstruct {
+ destructed <- nil
+ }
+ })
+
+ // killing random worker and expecting pool to replace it
+ p.workers[0].cmd.Process.Kill()
+ <-destructed
+
+ for _, w := range p.Workers() {
+ assert.Equal(t, StateReady, w.state.Value())
+ }
+}
+
func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") },
NewPipeFactory(),
Config{
NumWorkers: 1,
@@ -196,11 +233,11 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
func Test_StaticPool_Replace_Worker(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") },
NewPipeFactory(),
Config{
NumWorkers: 1,
- MaxExecutions: 1,
+ MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -229,10 +266,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
}
}
+
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "stop", "pipes") },
NewPipeFactory(),
Config{
NumWorkers: 1,
@@ -266,7 +304,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
func Benchmark_Pool_Allocate(b *testing.B) {
p, _ := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -285,7 +323,7 @@ func Benchmark_Pool_Allocate(b *testing.B) {
func Benchmark_Pool_Echo(b *testing.B) {
p, _ := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -300,7 +338,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
func Benchmark_Pool_Echo_Batched(b *testing.B) {
p, _ := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
@@ -323,11 +361,11 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
p, _ := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
Config{
NumWorkers: 1,
- MaxExecutions: 1,
+ MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
diff --git a/worker.go b/worker.go
index cc37b69d..811bda5f 100644
--- a/worker.go
+++ b/worker.go
@@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
)
// Worker - supervised process with api over goridge.Relay.
@@ -19,6 +20,9 @@ type Worker struct {
// can be nil while process is not started.
Pid *int
+ // Created indicates at what time worker has been created.
+ Created time.Time
+
// state holds information about current worker state,
// number of worker executions, last status change time.
// publicly this object is receive-only and protected using Mutex
@@ -26,13 +30,13 @@ type Worker struct {
state *state
// underlying command with associated process, command must be
- // provided to worker from outside in non-started form. Cmd
+ // provided to worker from outside in non-started form. CmdSource
// stdErr direction will be handled by worker to aggregate error message.
cmd *exec.Cmd
// err aggregates stderr output from underlying process. Value can be
// receive only once command is completed and all pipes are closed.
- err *bytes.Buffer
+ err *errBuffer
// channel is being closed once command is complete.
waitDone chan interface{}
@@ -54,13 +58,14 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) {
}
w := &Worker{
+ Created: time.Now(),
cmd: cmd,
- err: new(bytes.Buffer),
+ err: &errBuffer{buffer: new(bytes.Buffer)},
waitDone: make(chan interface{}),
state: newState(StateInactive),
}
- // piping all stderr to command buffer
+ // piping all stderr to command errBuffer
w.cmd.Stderr = w.err
return w, nil
@@ -105,9 +110,16 @@ func (w *Worker) Wait() error {
}
if w.endState.Success() {
+ w.state.set(StateStopped)
return nil
}
+ if w.state.Value() != StateStopping {
+ w.state.set(StateErrored)
+ } else {
+ w.state.set(StateStopped)
+ }
+
if w.err.Len() != 0 {
return errors.New(w.err.String())
}
@@ -125,7 +137,7 @@ func (w *Worker) Stop() error {
w.mu.Lock()
defer w.mu.Unlock()
- w.state.set(StateInactive)
+ w.state.set(StateStopping)
err := sendPayload(w.rl, &stopCommand{Stop: true})
<-w.waitDone
@@ -134,16 +146,13 @@ func (w *Worker) Stop() error {
}
// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Waits for process completion.
+// error log from the stderr. Does not waits for process completion!
func (w *Worker) Kill() error {
select {
case <-w.waitDone:
return nil
default:
- w.mu.Lock()
- defer w.mu.Unlock()
-
- w.state.set(StateInactive)
+ w.state.set(StateStopping)
err := w.cmd.Process.Signal(os.Kill)
<-w.waitDone
@@ -163,14 +172,13 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
}
if w.state.Value() != StateReady {
- return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value())
+ return nil, fmt.Errorf("worker is not ready (%s)", w.state.String())
}
w.state.set(StateWorking)
defer w.state.registerExec()
rsp, err = w.execPayload(rqs)
-
if err != nil {
if _, ok := err.(JobError); !ok {
w.state.set(StateErrored)
@@ -178,6 +186,9 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
}
}
+ // todo: attach when payload is complete
+ // todo: new status
+
w.state.set(StateReady)
return rsp, err
}
@@ -194,12 +205,11 @@ func (w *Worker) start() error {
go func() {
w.endState, _ = w.cmd.Process.Wait()
if w.waitDone != nil {
- w.state.set(StateStopped)
close(w.waitDone)
+ w.mu.Lock()
+ defer w.mu.Unlock()
if w.rl != nil {
- w.mu.Lock()
- defer w.mu.Unlock()
w.rl.Close()
}
}
@@ -230,6 +240,7 @@ func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
return nil, JobError(rsp.Context)
}
+ // add streaming support :)
if rsp.Body, pr, err = w.rl.Receive(); err != nil {
return nil, errors.Wrap(err, "worker error")
}
diff --git a/worker_test.go b/worker_test.go
index de11226a..2e3bc111 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -4,15 +4,15 @@ import (
"github.com/stretchr/testify/assert"
"os/exec"
"testing"
- "time"
)
func Test_GetState(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
+ assert.Equal(t, StateStopped, w.State().Value())
}()
assert.NoError(t, err)
@@ -20,11 +20,26 @@ func Test_GetState(t *testing.T) {
assert.Equal(t, StateReady, w.State().Value())
w.Stop()
- assert.Equal(t, StateStopped, w.State().Value())
+}
+
+func Test_Kill(t *testing.T) {
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.Error(t, w.Wait())
+ assert.Equal(t, StateStopped, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, StateReady, w.State().Value())
+ w.Kill()
}
func Test_Echo(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -42,17 +57,47 @@ func Test_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_BadPayload(t *testing.T) {
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(nil)
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ assert.Equal(t, "payload can not be empty", err.Error())
+}
+
func Test_NotStarted_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := newWorker(cmd)
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php php-src/tests/client.php echo pipes")
assert.Contains(t, w.String(), "inactive")
assert.Contains(t, w.String(), "numExecs: 0")
}
+func Test_NotStarted_Exec(t *testing.T) {
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
+
+ w, _ := newWorker(cmd)
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ assert.Equal(t, "worker is not ready (inactive)", err.Error())
+}
+
func Test_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -60,13 +105,13 @@ func Test_String(t *testing.T) {
}()
defer w.Stop()
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php php-src/tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
assert.Contains(t, w.String(), "numExecs: 0")
}
func Test_Echo_Slow(t *testing.T) {
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10")
+ cmd := exec.Command("php", "php-src/tests/slow-client.php", "echo", "pipes", "10", "10")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -85,7 +130,7 @@ func Test_Echo_Slow(t *testing.T) {
}
func Test_Broken(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -101,7 +146,7 @@ func Test_Broken(t *testing.T) {
}
func Test_OnStarted(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "broken", "pipes")
assert.Nil(t, cmd.Start())
w, err := newWorker(cmd)
@@ -112,7 +157,7 @@ func Test_OnStarted(t *testing.T) {
}
func Test_Error(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "error", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "error", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -129,7 +174,7 @@ func Test_Error(t *testing.T) {
}
func Test_NumExecs(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -138,27 +183,11 @@ func Test_NumExecs(t *testing.T) {
defer w.Stop()
w.Exec(&Payload{Body: []byte("hello")})
- assert.Equal(t, uint64(1), w.State().NumExecs())
-
- w.Exec(&Payload{Body: []byte("hello")})
- assert.Equal(t, uint64(2), w.State().NumExecs())
+ assert.Equal(t, int64(1), w.State().NumExecs())
w.Exec(&Payload{Body: []byte("hello")})
- assert.Equal(t, uint64(3), w.State().NumExecs())
-}
-
-func Test_StateUpdated(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer w.Stop()
-
- tm := time.Now()
- time.Sleep(time.Millisecond)
+ assert.Equal(t, int64(2), w.State().NumExecs())
w.Exec(&Payload{Body: []byte("hello")})
- assert.True(t, w.State().Updated().After(tm))
-}
+ assert.Equal(t, int64(3), w.State().NumExecs())
+} \ No newline at end of file