summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--Makefile2
-rwxr-xr-xbuild.sh2
-rw-r--r--cmd/rr/.rr.yaml23
-rw-r--r--cmd/rr/cmd/root.go13
-rw-r--r--cmd/rr/cmd/stop.go52
-rw-r--r--cmd/rr/http/reset.go2
-rw-r--r--cmd/rr/main.go2
-rw-r--r--cmd/util/config.go24
-rw-r--r--cmd/util/debug.go42
-rw-r--r--cmd/util/table.go2
-rw-r--r--composer.json16
-rw-r--r--error_buffer.go10
-rw-r--r--pool.go3
-rw-r--r--protocol.go4
-rw-r--r--protocol_test.go2
-rw-r--r--qbuild/.build.json17
-rw-r--r--qbuild/docker/Dockerfile8
-rw-r--r--qbuild/docker/compile.sh9
-rw-r--r--qbuild/main.go14
-rw-r--r--qbuild/rr-build40
-rw-r--r--qbuild/src/Builder.php237
-rw-r--r--server.go52
-rw-r--r--server_test.go107
-rw-r--r--service/container.go6
-rw-r--r--service/env/service.go2
-rw-r--r--service/http/config.go4
-rw-r--r--service/http/config_test.go30
-rw-r--r--service/http/handler.go4
-rw-r--r--service/http/handler_test.go50
-rw-r--r--service/http/rpc.go4
-rw-r--r--service/http/rpc_test.go6
-rw-r--r--service/http/service.go23
-rw-r--r--service/http/service_test.go20
-rw-r--r--service/http/uploads_test.go8
-rw-r--r--service/rpc/service.go11
-rw-r--r--service/rpc/service_test.go13
-rw-r--r--service/rpc/system.go18
-rw-r--r--service/static/service.go2
-rw-r--r--service/static/service_test.go18
-rw-r--r--service/watcher/config.go48
-rw-r--r--service/watcher/service.go46
-rw-r--r--service/watcher/state_watch.go58
-rw-r--r--service/watcher/watcher.go153
-rw-r--r--src/Diactoros/ServerRequestFactory.php3
-rw-r--r--src/Diactoros/StreamFactory.php3
-rw-r--r--src/Diactoros/UploadedFileFactory.php3
-rw-r--r--src/Exception/RoadRunnerException.php3
-rw-r--r--src/Exceptions/RoadRunnerException.php1
-rw-r--r--src/HttpClient.php3
-rw-r--r--src/PSR7Client.php3
-rw-r--r--src/Worker.php3
-rwxr-xr-xsrc/bin/roadrunner207
-rw-r--r--state.go8
-rw-r--r--static_pool.go60
-rw-r--r--static_pool_test.go11
-rw-r--r--tests/pid.php26
-rw-r--r--tests/slow-pid.php18
-rw-r--r--util/state_test.go10
-rw-r--r--watcher.go10
-rw-r--r--watcher_test.go216
-rw-r--r--worker.go22
62 files changed, 1287 insertions, 532 deletions
diff --git a/.travis.yml b/.travis.yml
index 53c3b379..58138aa7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,7 @@ language: go
sudo: required
go:
- - "1.11.x"
+ - "1.12.x"
install:
- export GO111MODULE=on
diff --git a/Makefile b/Makefile
index b1a78e02..4b92ed5b 100644
--- a/Makefile
+++ b/Makefile
@@ -13,7 +13,9 @@ test:
go test -v -race -cover
go test -v -race -cover ./util
go test -v -race -cover ./service
+ go test -v -race -cover ./service/util
go test -v -race -cover ./service/env
go test -v -race -cover ./service/rpc
go test -v -race -cover ./service/http
go test -v -race -cover ./service/static
+ go test -v -race -cover ./service/watcher
diff --git a/build.sh b/build.sh
index da45fd1c..fc577356 100755
--- a/build.sh
+++ b/build.sh
@@ -3,7 +3,7 @@ cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
# Pushes application version into the build information.
-RR_VERSION=1.3.7
+RR_VERSION=1.4.0
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index f50ff0e9..9cdde098 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -29,7 +29,7 @@ http:
key: server.key
# max POST request size, including file uploads in MB.
- maxRequest: 200
+ maxRequestSize: 200
# file upload configuration.
uploads:
@@ -58,6 +58,27 @@ http:
# amount of time given to worker to gracefully destruct itself.
destroyTimeout: 60
+# monitors roadrunner server(s)
+watch:
+ # check worker state each second
+ interval: 1
+
+ # custom watch configuration for each service
+ services:
+ # monitor http workers
+ http:
+ # maximum allowed memory consumption
+ maxMemory: 100
+
+ # maximum time to live for the worker
+ maxTTL: 0
+
+ # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections)
+ maxIdleTTL: 0
+
+ # max_execution_time, worker will be killed if that amount of time spend for task execution
+ maxExecTTL: 60
+
# static file serving. remove this section to disable static file serving.
static:
# root directory for static file (http would not serve .php and .htaccess files).
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index 74506004..1222177a 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -25,10 +25,11 @@ import (
"github.com/spf13/cobra"
"github.com/spiral/roadrunner/cmd/util"
"github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/watcher"
"os"
)
-// Service bus for all the commands.
+// Services bus for all the commands.
var (
cfgFile, workDir, logFormat string
override []string
@@ -106,6 +107,16 @@ func init() {
util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
os.Exit(1)
}
+
+ // global watcher config
+ if Verbose {
+ wcv, _ := Container.Get(watcher.ID)
+ if wcv, ok := wcv.(*watcher.Service); ok {
+ wcv.AddListener(func(event int, ctx interface{}) {
+ util.LogEvent(Logger, event, ctx)
+ })
+ }
+ }
})
}
diff --git a/cmd/rr/cmd/stop.go b/cmd/rr/cmd/stop.go
new file mode 100644
index 00000000..2f48615b
--- /dev/null
+++ b/cmd/rr/cmd/stop.go
@@ -0,0 +1,52 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package cmd
+
+import (
+ "github.com/spf13/cobra"
+ "github.com/spiral/roadrunner/cmd/util"
+)
+
+func init() {
+ CLI.AddCommand(&cobra.Command{
+ Use: "stop",
+ Short: "Detach RoadRunner server",
+ RunE: stopHandler,
+ })
+}
+
+func stopHandler(cmd *cobra.Command, args []string) error {
+ client, err := util.RPCClient(Container)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ util.Printf("<green>Stopping RoadRunner</reset>: ")
+
+ var r string
+ if err := client.Call("system.Detach", true, &r); err != nil {
+ return err
+ }
+
+ util.Printf("<green+hb>done</reset>\n")
+ return nil
+}
diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go
index 42fd966d..3008848a 100644
--- a/cmd/rr/http/reset.go
+++ b/cmd/rr/http/reset.go
@@ -41,7 +41,7 @@ func reloadHandler(cmd *cobra.Command, args []string) error {
}
defer client.Close()
- util.Printf("<green>restarting http worker pool</reset>: ")
+ util.Printf("<green>Restarting http worker pool</reset>: ")
var r string
if err := client.Call("http.Reset", true, &r); err != nil {
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 54915957..dc2fbc20 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -24,6 +24,7 @@ package main
import (
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service/watcher"
// services (plugins)
"github.com/spiral/roadrunner/service/env"
@@ -40,6 +41,7 @@ func main() {
rr.Container.Register(rpc.ID, &rpc.Service{})
rr.Container.Register(http.ID, &http.Service{})
rr.Container.Register(static.ID, &static.Service{})
+ rr.Container.Register(watcher.ID, &watcher.Service{})
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/cmd/util/config.go b/cmd/util/config.go
index a354b132..d5b1020c 100644
--- a/cmd/util/config.go
+++ b/cmd/util/config.go
@@ -63,6 +63,8 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*co
// read in environment variables that match
cfg.AutomaticEnv()
+ cfg.SetEnvPrefix("rr")
+ cfg.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// If a cfg file is found, read it in.
if err := cfg.ReadInConfig(); err != nil {
@@ -71,6 +73,13 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*co
}
}
+ // automatically inject ENV variables using ${ENV} pattern
+ for _, key := range cfg.AllKeys() {
+ val := cfg.Get(key)
+ cfg.Set(key, parseEnv(val))
+ }
+
+ // merge with console flags
if len(flags) != 0 {
for _, f := range flags {
k, v, err := parseFlag(f)
@@ -114,3 +123,18 @@ func parseValue(value string) string {
return value
}
+
+func parseEnv(value interface{}) interface{} {
+ str, ok := value.(string)
+ if !ok || len(str) <= 3 {
+ return value
+ }
+
+ if str[0:2] == "${" && str[len(str)-1:] == "}" {
+ if v, ok := os.LookupEnv(str[2 : len(str)-1]); ok {
+ return v
+ }
+ }
+
+ return str
+}
diff --git a/cmd/util/debug.go b/cmd/util/debug.go
index f64b9bc4..c120eba2 100644
--- a/cmd/util/debug.go
+++ b/cmd/util/debug.go
@@ -3,6 +3,7 @@ package util
import (
"github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/watcher"
"strings"
)
@@ -12,7 +13,7 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
case roadrunner.EventWorkerKill:
w := ctx.(*roadrunner.Worker)
logger.Warning(Sprintf(
- "<white+hb>worker.%v</reset> <yellow>killed</red>",
+ "<white+hb>worker.%v</reset> <yellow>killed</reset>",
*w.Pid,
))
return true
@@ -57,5 +58,44 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
return true
}
+ // watchers
+ switch event {
+ case watcher.EventMaxTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Debug(Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+
+ case watcher.EventMaxIdleTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Debug(Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+
+ case watcher.EventMaxMemory:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Error(Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+
+ case watcher.EventMaxExecTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Error(Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+ }
+
return false
}
diff --git a/cmd/util/table.go b/cmd/util/table.go
index 565c0679..c0e20837 100644
--- a/cmd/util/table.go
+++ b/cmd/util/table.go
@@ -40,6 +40,8 @@ func renderStatus(status string) string {
return Sprintf("<cyan>ready</reset>")
case "working":
return Sprintf("<green>working</reset>")
+ case "invalid":
+ return Sprintf("<yellow>invalid</reset>")
case "stopped":
return Sprintf("<red>stopped</reset>")
case "errored":
diff --git a/composer.json b/composer.json
index cb08ee97..9e2e78fa 100644
--- a/composer.json
+++ b/composer.json
@@ -7,22 +7,26 @@
{
"name": "Anton Titov / Wolfy-J",
"email": "[email protected]"
+ },
+ {
+ "name": "RoadRunner Community",
+ "homepage": "https://github.com/spiral/roadrunner/graphs/contributors"
}
],
"require": {
"php": "^7.0",
- "ext-json": "*",
"spiral/goridge": "^2.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^1.0",
- "zendframework/zend-diactoros": "^1.3|^2.0"
+ "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0",
+ "http-interop/http-factory-diactoros": "^1.0"
},
- "bin": [
- "qbuild/rr-build"
- ],
"autoload": {
"psr-4": {
"Spiral\\RoadRunner\\": "src/"
}
- }
+ },
+ "bin": [
+ "src/bin/roadrunner"
+ ]
}
diff --git a/error_buffer.go b/error_buffer.go
index fec789a9..0eaf03b6 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -73,9 +73,8 @@ func newErrBuffer() *errBuffer {
// Listen attaches error stream even listener.
func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
eb.mu.Lock()
- defer eb.mu.Unlock()
-
eb.lsn = l
+ eb.mu.Unlock()
}
// Len returns the number of buf of the unread portion of the errBuffer;
@@ -88,14 +87,13 @@ func (eb *errBuffer) Len() int {
return len(eb.buf)
}
-// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil.
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; err is always nil.
func (eb *errBuffer) Write(p []byte) (int, error) {
eb.mu.Lock()
- defer eb.mu.Unlock()
-
eb.buf = append(eb.buf, p...)
eb.update <- nil
+ eb.mu.Unlock()
return len(p), nil
}
diff --git a/pool.go b/pool.go
index 7dfea26c..23857604 100644
--- a/pool.go
+++ b/pool.go
@@ -31,6 +31,9 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []*Worker)
+ // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker.
+ Remove(w *Worker, err error) bool
+
// Destroy all underlying workers (but let them to complete the task).
Destroy()
}
diff --git a/protocol.go b/protocol.go
index 564df9b7..5523a3e5 100644
--- a/protocol.go
+++ b/protocol.go
@@ -15,7 +15,7 @@ type pidCommand struct {
Pid int `json:"pid"`
}
-func sendPayload(rl goridge.Relay, v interface{}) error {
+func sendControl(rl goridge.Relay, v interface{}) error {
if data, ok := v.([]byte); ok {
return rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw)
}
@@ -29,7 +29,7 @@ func sendPayload(rl goridge.Relay, v interface{}) error {
}
func fetchPID(rl goridge.Relay) (pid int, err error) {
- if err := sendPayload(rl, pidCommand{Pid: os.Getpid()}); err != nil {
+ if err := sendControl(rl, pidCommand{Pid: os.Getpid()}); err != nil {
return 0, err
}
diff --git a/protocol_test.go b/protocol_test.go
index ed3fe461..526945cb 100644
--- a/protocol_test.go
+++ b/protocol_test.go
@@ -29,7 +29,7 @@ func (r *relayMock) Close() error {
}
func Test_Protocol_Errors(t *testing.T) {
- err := sendPayload(&relayMock{}, make(chan int))
+ err := sendControl(&relayMock{}, make(chan int))
assert.Error(t, err)
}
diff --git a/qbuild/.build.json b/qbuild/.build.json
deleted file mode 100644
index 74b83cea..00000000
--- a/qbuild/.build.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "packages": [
- "github.com/spiral/roadrunner/service/env",
- "github.com/spiral/roadrunner/service/http",
- "github.com/spiral/roadrunner/service/rpc",
- "github.com/spiral/roadrunner/service/static"
- ],
- "commands": [
- "github.com/spiral/roadrunner/cmd/rr/http"
- ],
- "register": [
- "rr.Container.Register(env.ID, &env.Service{})",
- "rr.Container.Register(rpc.ID, &rpc.Service{})",
- "rr.Container.Register(http.ID, &http.Service{})",
- "rr.Container.Register(static.ID, &static.Service{})"
- ]
-} \ No newline at end of file
diff --git a/qbuild/docker/Dockerfile b/qbuild/docker/Dockerfile
deleted file mode 100644
index 11fb5abb..00000000
--- a/qbuild/docker/Dockerfile
+++ /dev/null
@@ -1,8 +0,0 @@
-FROM golang:latest
-
-ENV CGO_ENABLED=0
-ENV GO111MODULE=on
-
-WORKDIR /go/src/rr
-
-COPY compile.sh /go/src/rr/ \ No newline at end of file
diff --git a/qbuild/docker/compile.sh b/qbuild/docker/compile.sh
deleted file mode 100644
index 0c85124f..00000000
--- a/qbuild/docker/compile.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/bin/bash
-LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
-LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.BuildTime=$(date +%FT%T%z)"
-
-# Verify all external modules
-go mod init
-
-# Build the binary
-CGO_ENABLED=0 go build -v -ldflags "$LDFLAGS -extldflags '-static'" -o "rr" \ No newline at end of file
diff --git a/qbuild/main.go b/qbuild/main.go
deleted file mode 100644
index a065fe27..00000000
--- a/qbuild/main.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package main
-
-import (
- "github.com/sirupsen/logrus"
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- // -packages- //
- // -commands- //
-)
-
-func main() {
- // -register- //
- rr.Logger.Formatter = &logrus.TextFormatter{ForceColors: true}
- rr.Execute()
-}
diff --git a/qbuild/rr-build b/qbuild/rr-build
deleted file mode 100644
index 7918f4f8..00000000
--- a/qbuild/rr-build
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/usr/bin/env php
-<?php
-/**
- * Automatic roadrunner builds.
- */
-
-use Spiral\RoadRunner\QuickBuild\Builder;
-
-require_once "src/Builder.php";
-
-// load build config
-$version = $argv[1] ?? "1.3.1";
-
-// load build config
-$config = $argv[2] ?? __DIR__ . "/.build.json";
-
-// Greeting!
-Builder::cprintf(
- "Building <green>RoadRunner</reset> specifically for you (version: <white>%s</reset>)...\n",
- $version
-);
-
-$builder = Builder::loadConfig($config);
-if ($builder == null) {
- Builder::cprintf("<red>Unable to load config:</reset> %s\n", $config);
- return;
-}
-
-$errors = $builder->configErrors();
-if (!empty($errors)) {
- Builder::cprintf("<yellow>Found configuration errors:</reset>\n");
- foreach ($errors as $error) {
- Builder::cprintf("- <red>%s</reset>\n", $error);
- }
-
- return;
-}
-
-// Start build
-$builder->build(getcwd(), __DIR__ . '/main.go', 'rr', $version); \ No newline at end of file
diff --git a/qbuild/src/Builder.php b/qbuild/src/Builder.php
deleted file mode 100644
index 4b441094..00000000
--- a/qbuild/src/Builder.php
+++ /dev/null
@@ -1,237 +0,0 @@
-<?php
-declare(strict_types=1);
-/**
- * RoadRunner.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-
-namespace Spiral\RoadRunner\QuickBuild;
-
-final class Builder
-{
- const DOCKER = 'spiralscout/rr-build';
-
- /**
- * Coloring.
- *
- * @var array
- */
- protected static $colors = [
- "reset" => "\e[0m",
- "white" => "\033[1;38m",
- "red" => "\033[0;31m",
- "green" => "\033[0;32m",
- "yellow" => "\033[1;93m",
- "gray" => "\033[0;90m"
- ];
-
- /** @var array */
- private $config;
-
- /**
- * @param array $config
- */
- protected function __construct(array $config)
- {
- $this->config = $config;
- }
-
- /**
- * Validate the build configuration.
- *
- * @return array
- */
- public function configErrors(): array
- {
- $errors = [];
- if (!isset($this->config["commands"])) {
- $errors[] = "Directive 'commands' missing";
- }
-
- if (!isset($this->config["packages"])) {
- $errors[] = "Directive 'packages' missing";
- }
-
- if (!isset($this->config["register"])) {
- $errors[] = "Directive 'register' missing";
- }
-
- return $errors;
- }
-
- /**
- * Build the application.
- *
- * @param string $directory
- * @param string $template
- * @param string $output
- * @param string $version
- */
- public function build(string $directory, string $template, string $output, string $version)
- {
- $filename = $directory . "/main.go";
- $output = $output . ($this->getOS() == 'windows' ? '.exe' : '');
-
- // step 1, generate template
- $this->generate($template, $filename);
-
- $command = sprintf(
- 'docker run --rm -v "%s":/mnt -e RR_VERSION=%s -e GOARCH=amd64 -e GOOS=%s %s /bin/bash -c "mv /mnt/main.go main.go; bash compile.sh; cp rr /mnt/%s;"',
- $directory,
- $version,
- $this->getOS(),
- self::DOCKER,
- $output
- );
-
- self::cprintf("<yellow>%s</reset>\n", $command);
-
- // run the build
- $this->run($command, true);
-
- if (!file_exists($directory . '/' . $output)) {
- self::cprintf("<red>Build has failed!</reset>");
- return;
- }
-
- self::cprintf("<green>Build complete!</reset>\n");
- $this->run($directory . '/' . $output, false);
- }
-
- /**
- * @param string $command
- * @param bool $shadow
- */
- protected function run(string $command, bool $shadow = false)
- {
- $shadow && self::cprintf("<gray>");
- passthru($command);
- $shadow && self::cprintf("</reset>");
- }
-
- /**
- * @param string $template
- * @param string $filename
- */
- protected function generate(string $template, string $filename)
- {
- $body = file_get_contents($template);
-
- $replace = [
- '// -packages- //' => '"' . join("\"\n\"", $this->config['packages']) . '"',
- '// -commands- //' => '_ "' . join("\"\n_ \"", $this->config['commands']) . '"',
- '// -register- //' => join("\n", $this->config['register'])
- ];
-
- // compile the template
- $result = str_replace(array_keys($replace), array_values($replace), $body);
- file_put_contents($filename, $result);
- }
-
- /**
- * @return string
- */
- protected function getOS(): string
- {
- $os = strtolower(PHP_OS);
-
- if (strpos($os, 'darwin') !== false) {
- return 'darwin';
- }
-
- if (strpos($os, 'win') !== false) {
- return 'windows';
- }
-
- return "linux";
- }
-
- /**
- * Create new builder using given config.
- *
- * @param string $config
- * @return Builder|null
- */
- public static function loadConfig(string $config): ?Builder
- {
- if (!file_exists($config)) {
- return null;
- }
-
- $configData = json_decode(file_get_contents($config), true);
- if (!is_array($configData)) {
- return null;
- }
-
- return new Builder($configData);
- }
-
- /**
- * Make colored output.
- *
- * @param string $format
- * @param mixed ...$args
- */
- public static function cprintf(string $format, ...$args)
- {
- if (self::isColorsSupported()) {
- $format = preg_replace_callback("/<\/?([^>]+)>/", function ($value) {
- return self::$colors[$value[1]];
- }, $format);
- } else {
- $format = preg_replace("/<[^>]+>/", "", $format);
- }
-
- echo sprintf($format, ...$args);
- }
-
- /**
- * @return bool
- */
- public static function isWindows(): bool
- {
- return \DIRECTORY_SEPARATOR === '\\';
- }
-
- /**
- * Returns true if the STDOUT supports colorization.
- *
- * @codeCoverageIgnore
- * @link https://github.com/symfony/Console/blob/master/Output/StreamOutput.php#L94
- * @param mixed $stream
- * @return bool
- */
- public static function isColorsSupported($stream = STDOUT): bool
- {
- if ('Hyper' === getenv('TERM_PROGRAM')) {
- return true;
- }
-
- try {
- if (\DIRECTORY_SEPARATOR === '\\') {
- return (
- function_exists('sapi_windows_vt100_support')
- && @sapi_windows_vt100_support($stream)
- ) || getenv('ANSICON') !== false
- || getenv('ConEmuANSI') == 'ON'
- || getenv('TERM') == 'xterm';
- }
-
- if (\function_exists('stream_isatty')) {
- return (bool)@stream_isatty($stream);
- }
-
- if (\function_exists('posix_isatty')) {
- return (bool)@posix_isatty($stream);
- }
-
- $stat = @fstat($stream);
- // Check if formatted mode is S_IFCHR
- return $stat ? 0020000 === ($stat['mode'] & 0170000) : false;
- } catch (\Throwable $e) {
- return false;
- }
- }
-}
diff --git a/server.go b/server.go
index 26f58172..397898f2 100644
--- a/server.go
+++ b/server.go
@@ -37,9 +37,13 @@ type Server struct {
// creates and connects to workers
factory Factory
+ // associated pool watcher
+ watcher Watcher
+
// currently active pool instance
- mup sync.Mutex
- pool Pool
+ mup sync.Mutex
+ pool Pool
+ pWatcher Watcher
// observes pool events (can be attached to multiple pools at the same time)
mul sync.Mutex
@@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) {
s.lsn = l
}
+// Listen attaches server event watcher.
+func (s *Server) Watch(w Watcher) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.watcher = w
+
+ s.mul.Lock()
+ if s.pWatcher != nil && s.pool != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+ s.mul.Unlock()
+}
+
// Start underlying worker pool, configure factory and command provider.
func (s *Server) Start() (err error) {
s.mu.Lock()
@@ -72,6 +91,10 @@ func (s *Server) Start() (err error) {
return err
}
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+
s.pool.Listen(s.poolListener)
s.started = true
s.throw(EventServerStart, s)
@@ -79,7 +102,7 @@ func (s *Server) Start() (err error) {
return nil
}
-// Stop underlying worker pool and close the factory.
+// Detach underlying worker pool and close the factory.
func (s *Server) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -89,6 +112,12 @@ func (s *Server) Stop() {
}
s.throw(EventPoolDestruct, s.pool)
+
+ if s.pWatcher != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = nil
+ }
+
s.pool.Destroy()
s.factory.Close()
@@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
previous := s.pool
+ pWatcher := s.pWatcher
s.mu.Unlock()
pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
@@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
s.cfg.Pool, s.pool = cfg.Pool, pool
+
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(pool)
+ }
+
s.mu.Unlock()
s.throw(EventPoolConstruct, pool)
if previous != nil {
- go func(previous Pool) {
+ go func(previous Pool, pWatcher Watcher) {
s.throw(EventPoolDestruct, previous)
+ if pWatcher != nil {
+ pWatcher.Detach()
+ }
+
previous.Destroy()
- }(previous)
+ }(previous, pWatcher)
}
return nil
@@ -203,9 +242,8 @@ func (s *Server) poolListener(event int, ctx interface{}) {
// throw invokes event handler if any.
func (s *Server) throw(event int, ctx interface{}) {
s.mul.Lock()
- defer s.mul.Unlock()
-
if s.lsn != nil {
s.lsn(event, ctx)
}
+ s.mul.Unlock()
}
diff --git a/server_test.go b/server_test.go
index 4646ebc9..c973d634 100644
--- a/server_test.go
+++ b/server_test.go
@@ -9,7 +9,7 @@ import (
)
func TestServer_PipesEcho(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -19,11 +19,11 @@ func TestServer_PipesEcho(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
+ assert.NoError(t, rr.Start())
- res, err := srv.Exec(&Payload{Body: []byte("hello")})
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -33,8 +33,27 @@ func TestServer_PipesEcho(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func TestServer_NoPool(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+}
+
func TestServer_SocketEcho(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo tcp",
Relay: "tcp://:9007",
@@ -45,11 +64,11 @@ func TestServer_SocketEcho(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
+ assert.NoError(t, rr.Start())
- res, err := srv.Exec(&Payload{Body: []byte("hello")})
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -60,7 +79,7 @@ func TestServer_SocketEcho(t *testing.T) {
}
func TestServer_Configure_BeforeStart(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -70,9 +89,9 @@ func TestServer_Configure_BeforeStart(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- err := srv.Reconfigure(&ServerConfig{
+ err := rr.Reconfigure(&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
Pool: &Config{
@@ -83,9 +102,9 @@ func TestServer_Configure_BeforeStart(t *testing.T) {
})
assert.NoError(t, err)
- assert.NoError(t, srv.Start())
+ assert.NoError(t, rr.Start())
- res, err := srv.Exec(&Payload{Body: []byte("hello")})
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -93,11 +112,11 @@ func TestServer_Configure_BeforeStart(t *testing.T) {
assert.Nil(t, res.Context)
assert.Equal(t, "hello", res.String())
- assert.Len(t, srv.Workers(), 2)
+ assert.Len(t, rr.Workers(), 2)
}
func TestServer_Stop_NotStarted(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -108,12 +127,12 @@ func TestServer_Stop_NotStarted(t *testing.T) {
},
})
- srv.Stop()
- assert.Nil(t, srv.Workers())
+ rr.Stop()
+ assert.Nil(t, rr.Workers())
}
func TestServer_Reconfigure(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -123,12 +142,12 @@ func TestServer_Reconfigure(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
- assert.Len(t, srv.Workers(), 1)
+ assert.NoError(t, rr.Start())
+ assert.Len(t, rr.Workers(), 1)
- err := srv.Reconfigure(&ServerConfig{
+ err := rr.Reconfigure(&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
Pool: &Config{
@@ -139,11 +158,11 @@ func TestServer_Reconfigure(t *testing.T) {
})
assert.NoError(t, err)
- assert.Len(t, srv.Workers(), 2)
+ assert.Len(t, rr.Workers(), 2)
}
func TestServer_Reset(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -153,19 +172,19 @@ func TestServer_Reset(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
- assert.Len(t, srv.Workers(), 1)
+ assert.NoError(t, rr.Start())
+ assert.Len(t, rr.Workers(), 1)
- pid := *srv.Workers()[0].Pid
- assert.NoError(t, srv.Reset())
- assert.Len(t, srv.Workers(), 1)
- assert.NotEqual(t, pid, srv.Workers()[0].Pid)
+ pid := *rr.Workers()[0].Pid
+ assert.NoError(t, rr.Reset())
+ assert.Len(t, rr.Workers(), 1)
+ assert.NotEqual(t, pid, rr.Workers()[0].Pid)
}
func TestServer_ReplacePool(t *testing.T) {
- srv := NewServer(
+ rr := NewServer(
&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -175,27 +194,27 @@ func TestServer_ReplacePool(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
+ assert.NoError(t, rr.Start())
constructed := make(chan interface{})
- srv.Listen(func(e int, ctx interface{}) {
+ rr.Listen(func(e int, ctx interface{}) {
if e == EventPoolConstruct {
close(constructed)
}
})
- srv.Reset()
+ rr.Reset()
<-constructed
- for _, w := range srv.Workers() {
+ for _, w := range rr.Workers() {
assert.Equal(t, StateReady, w.state.Value())
}
}
func TestServer_ServerFailure(t *testing.T) {
- srv := NewServer(&ServerConfig{
+ rr := NewServer(&ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
Pool: &Config{
@@ -204,25 +223,25 @@ func TestServer_ServerFailure(t *testing.T) {
DestroyTimeout: time.Second,
},
})
- defer srv.Stop()
+ defer rr.Stop()
- assert.NoError(t, srv.Start())
+ assert.NoError(t, rr.Start())
failure := make(chan interface{})
- srv.Listen(func(e int, ctx interface{}) {
+ rr.Listen(func(e int, ctx interface{}) {
if e == EventServerFailure {
failure <- nil
}
})
// emulating potential server failure
- srv.cfg.Command = "php tests/client.php echo broken-connection"
- srv.pool.(*StaticPool).cmd = func() *exec.Cmd {
+ rr.cfg.Command = "php tests/client.php echo broken-connection"
+ rr.pool.(*StaticPool).cmd = func() *exec.Cmd {
return exec.Command("php", "tests/client.php", "echo", "broken-connection")
}
// killing random worker and expecting pool to replace it
- srv.Workers()[0].cmd.Process.Kill()
+ rr.Workers()[0].cmd.Process.Kill()
<-failure
assert.True(t, true)
diff --git a/service/container.go b/service/container.go
index 275cfffd..abeaf369 100644
--- a/service/container.go
+++ b/service/container.go
@@ -16,13 +16,13 @@ var errNoConfig = fmt.Errorf("no config has been provided")
// implement service.HydrateConfig.
const InitMethod = "Init"
-// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept
+// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept
// other services and/or configs as dependency.
type Service interface {
// Serve serves.
Serve() error
- // Stop stops the service.
+ // Detach stops the service.
Stop()
}
@@ -198,7 +198,7 @@ func (c *container) Serve() error {
return nil
}
-// Stop sends stop command to all running services.
+// Detach sends stop command to all running services.
func (c *container) Stop() {
for _, e := range c.services {
if e.hasStatus(StatusServing) {
diff --git a/service/env/service.go b/service/env/service.go
index 83175b36..00bc41ef 100644
--- a/service/env/service.go
+++ b/service/env/service.go
@@ -3,7 +3,7 @@ package env
// ID contains default service name.
const ID = "env"
-// Service provides ability to map _ENV values from config file.
+// Services provides ability to map _ENV values from config file.
type Service struct {
// values is default set of values.
values map[string]string
diff --git a/service/http/config.go b/service/http/config.go
index 5a2c8768..899a5083 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -17,8 +17,8 @@ type Config struct {
// SSL defines https server options.
SSL SSLConfig
- // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited.
- MaxRequest int64
+ // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
+ MaxRequestSize int64
// Uploads configures uploads configuration.
Uploads *UploadsConfig
diff --git a/service/http/config_test.go b/service/http/config_test.go
index 07901cb6..4cd2783f 100644
--- a/service/http/config_test.go
+++ b/service/http/config_test.go
@@ -31,8 +31,8 @@ func Test_Config_Hydrate_Error2(t *testing.T) {
func Test_Config_Valid(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -58,7 +58,7 @@ func Test_Config_Valid_SSL(t *testing.T) {
Cert: "fixtures/server.crt",
Key: "fixtures/server.key",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -87,7 +87,7 @@ func Test_Config_SSL_No_key(t *testing.T) {
SSL: SSLConfig{
Cert: "fixtures/server.crt",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -112,7 +112,7 @@ func Test_Config_SSL_No_Cert(t *testing.T) {
SSL: SSLConfig{
Key: "fixtures/server.key",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -133,8 +133,8 @@ func Test_Config_SSL_No_Cert(t *testing.T) {
func Test_Config_NoUploads(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Workers: &roadrunner.ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -151,8 +151,8 @@ func Test_Config_NoUploads(t *testing.T) {
func Test_Config_NoWorkers(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -164,8 +164,8 @@ func Test_Config_NoWorkers(t *testing.T) {
func Test_Config_NoPool(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -186,8 +186,8 @@ func Test_Config_NoPool(t *testing.T) {
func Test_Config_DeadPool(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -203,8 +203,8 @@ func Test_Config_DeadPool(t *testing.T) {
func Test_Config_InvalidAddress(t *testing.T) {
cfg := &Config{
- Address: "",
- MaxRequest: 1024,
+ Address: "",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
diff --git a/service/http/handler.go b/service/http/handler.go
index 8cebc42a..a7a6d4d0 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -75,12 +75,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// validating request size
- if h.cfg.MaxRequest != 0 {
+ if h.cfg.MaxRequestSize != 0 {
if length := r.Header.Get("content-length"); length != "" {
if size, err := strconv.ParseInt(length, 10, 64); err != nil {
h.handleError(w, r, err, start)
return
- } else if size > h.cfg.MaxRequest*1024*1024 {
+ } else if size > h.cfg.MaxRequestSize*1024*1024 {
h.handleError(w, r, errors.New("request body max size is exceeded"), start)
return
}
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index d876ef8e..5d4f7659 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -32,7 +32,7 @@ func get(url string) (string, *http.Response, error) {
func TestHandler_Echo(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -67,7 +67,7 @@ func TestHandler_Echo(t *testing.T) {
func Test_HandlerErrors(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -94,7 +94,7 @@ func Test_HandlerErrors(t *testing.T) {
func Test_Handler_JSON_error(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -123,7 +123,7 @@ func Test_Handler_JSON_error(t *testing.T) {
func TestHandler_Headers(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -170,7 +170,7 @@ func TestHandler_Headers(t *testing.T) {
func TestHandler_Empty_User_Agent(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -216,7 +216,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
func TestHandler_User_Agent(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -262,7 +262,7 @@ func TestHandler_User_Agent(t *testing.T) {
func TestHandler_Cookies(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -313,7 +313,7 @@ func TestHandler_Cookies(t *testing.T) {
func TestHandler_JsonPayload_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -363,7 +363,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
func TestHandler_JsonPayload_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -409,7 +409,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
func TestHandler_JsonPayload_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -455,7 +455,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
func TestHandler_FormData_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -513,7 +513,7 @@ func TestHandler_FormData_POST(t *testing.T) {
func TestHandler_FormData_POST_Overwrite(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -565,7 +565,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -623,7 +623,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
func TestHandler_FormData_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -681,7 +681,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
func TestHandler_FormData_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -739,7 +739,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
func TestHandler_Multipart_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -801,7 +801,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
func TestHandler_Multipart_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -863,7 +863,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
func TestHandler_Multipart_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -925,7 +925,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
func TestHandler_Error(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -959,7 +959,7 @@ func TestHandler_Error(t *testing.T) {
func TestHandler_Error2(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -993,7 +993,7 @@ func TestHandler_Error2(t *testing.T) {
func TestHandler_Error3(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1,
+ MaxRequestSize: 1,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1038,7 +1038,7 @@ func TestHandler_Error3(t *testing.T) {
func TestHandler_ResponseDuration(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1087,7 +1087,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
func TestHandler_ResponseDurationDelayed(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1136,7 +1136,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
func TestHandler_ErrorDuration(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1184,7 +1184,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
func BenchmarkHandler_Listen_Echo(b *testing.B) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
diff --git a/service/http/rpc.go b/service/http/rpc.go
index 3390a93d..7b38dece 100644
--- a/service/http/rpc.go
+++ b/service/http/rpc.go
@@ -20,7 +20,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error {
}
*r = "OK"
- return rpc.svc.rr.Reset()
+ return rpc.svc.Server().Reset()
}
// Workers returns list of active workers and their stats.
@@ -29,6 +29,6 @@ func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
return errors.New("http server is not running")
}
- r.Workers, err = util.ServerState(rpc.svc.rr)
+ r.Workers, err = util.ServerState(rpc.svc.Server())
return err
}
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
index ba3efd2e..669b201c 100644
--- a/service/http/rpc_test.go
+++ b/service/http/rpc_test.go
@@ -27,7 +27,7 @@ func Test_RPC(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -88,7 +88,7 @@ func Test_RPC_Unix(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -142,7 +142,7 @@ func Test_Workers(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/http/service.go b/service/http/service.go
index ad59f887..651284b4 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -25,7 +25,7 @@ const (
// http middleware type.
type middleware func(f http.HandlerFunc) http.HandlerFunc
-// Service manages rr, http servers.
+// Services manages rr, http servers.
type Service struct {
cfg *Config
env env.Environment
@@ -33,11 +33,17 @@ type Service struct {
mdwr []middleware
mu sync.Mutex
rr *roadrunner.Server
+ watcher roadrunner.Watcher
handler *Handler
http *http.Server
https *http.Server
}
+// Watch attaches watcher.
+func (s *Service) Watch(w roadrunner.Watcher) {
+ s.watcher = w
+}
+
// AddMiddleware adds new net/http mdwr.
func (s *Service) AddMiddleware(m middleware) {
s.mdwr = append(s.mdwr, m)
@@ -53,6 +59,7 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) {
func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) {
s.cfg = cfg
s.env = e
+
if r != nil {
if err := r.Register(ID, &rpcServer{s}); err != nil {
return false, err
@@ -77,6 +84,10 @@ func (s *Service) Serve() error {
s.rr = roadrunner.NewServer(s.cfg.Workers)
s.rr.Listen(s.throw)
+ if s.watcher != nil {
+ s.rr.Watch(s.watcher)
+ }
+
s.handler = &Handler{cfg: s.cfg, rr: s.rr}
s.handler.Listen(s.throw)
@@ -102,7 +113,7 @@ func (s *Service) Serve() error {
return <-err
}
-// Stop stops the svc.
+// Detach stops the svc.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -117,6 +128,14 @@ func (s *Service) Stop() {
go s.http.Shutdown(context.Background())
}
+// Server returns associated roadrunner server (if any).
+func (s *Service) Server() *roadrunner.Server {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ return s.rr
+}
+
// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
diff --git a/service/http/service_test.go b/service/http/service_test.go
index d1d601dc..5b6d60d8 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -84,7 +84,7 @@ func Test_Service_Configure_Enable(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":8070",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -115,7 +115,7 @@ func Test_Service_Echo(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -168,7 +168,7 @@ func Test_Service_Env(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -220,7 +220,7 @@ func Test_Service_ErrorEcho(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -280,7 +280,7 @@ func Test_Service_Middleware(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -354,7 +354,7 @@ func Test_Service_Listener(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -398,7 +398,7 @@ func Test_Service_Error(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -427,7 +427,7 @@ func Test_Service_Error2(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -456,7 +456,7 @@ func Test_Service_Error3(t *testing.T) {
assert.Error(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -483,7 +483,7 @@ func Test_Service_Error4(t *testing.T) {
assert.Error(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": "----",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
index d452f834..0fbf0e14 100644
--- a/service/http/uploads_test.go
+++ b/service/http/uploads_test.go
@@ -20,7 +20,7 @@ import (
func TestHandler_Upload_File(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -81,7 +81,7 @@ func TestHandler_Upload_File(t *testing.T) {
func TestHandler_Upload_NestedFile(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -142,7 +142,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: "-----",
Forbid: []string{},
@@ -203,7 +203,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
func TestHandler_Upload_File_Forbids(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
diff --git a/service/rpc/service.go b/service/rpc/service.go
index 0b957976..ea262615 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -3,6 +3,7 @@ package rpc
import (
"errors"
"github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/env"
"net/rpc"
"sync"
@@ -11,7 +12,7 @@ import (
// ID contains default service name.
const ID = "rpc"
-// Service is RPC service.
+// Services is RPC service.
type Service struct {
cfg *Config
stop chan interface{}
@@ -21,7 +22,7 @@ type Service struct {
}
// Init rpc service. Must return true if service is enabled.
-func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) {
+func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) {
if !cfg.Enable {
return false, nil
}
@@ -33,6 +34,10 @@ func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) {
env.SetEnv("RR_RPC", cfg.Listen)
}
+ if err := s.Register("system", &systemService{c}); err != nil {
+ return false, err
+ }
+
return true, nil
}
@@ -78,7 +83,7 @@ func (s *Service) Serve() error {
return nil
}
-// Stop stops the service.
+// Detach stops the service.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
index 0278d287..ee87509a 100644
--- a/service/rpc/service_test.go
+++ b/service/rpc/service_test.go
@@ -1,6 +1,7 @@
package rpc
import (
+ "github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/env"
"github.com/stretchr/testify/assert"
"testing"
@@ -13,7 +14,7 @@ func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil
func Test_Disabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: false}, nil)
+ ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.False(t, ok)
@@ -31,7 +32,7 @@ func Test_RegisterNotConfigured(t *testing.T) {
func Test_Enabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -39,7 +40,7 @@ func Test_Enabled(t *testing.T) {
func Test_StopNonServing(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -48,7 +49,7 @@ func Test_StopNonServing(t *testing.T) {
func Test_Serve_Errors(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -61,7 +62,7 @@ func Test_Serve_Errors(t *testing.T) {
func Test_Serve_Client(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -85,7 +86,7 @@ func Test_Serve_Client(t *testing.T) {
func TestSetEnv(t *testing.T) {
s := &Service{}
e := env.NewService(map[string]string{})
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, e)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e)
assert.NoError(t, err)
assert.True(t, ok)
diff --git a/service/rpc/system.go b/service/rpc/system.go
new file mode 100644
index 00000000..d1368a05
--- /dev/null
+++ b/service/rpc/system.go
@@ -0,0 +1,18 @@
+package rpc
+
+import "github.com/spiral/roadrunner/service"
+
+// systemService service controls roadrunner server.
+type systemService struct {
+ c service.Container
+}
+
+// Detach the underlying c.
+func (s *systemService) Stop(stop bool, r *string) error {
+ if stop {
+ s.c.Stop()
+ }
+ *r = "OK"
+
+ return nil
+}
diff --git a/service/static/service.go b/service/static/service.go
index b824e787..679033f2 100644
--- a/service/static/service.go
+++ b/service/static/service.go
@@ -9,7 +9,7 @@ import (
// ID contains default service name.
const ID = "static"
-// Service serves static files. Potentially convert into middleware?
+// Services serves static files. Potentially convert into middleware?
type Service struct {
// server configuration (location, forbidden files and etc)
cfg *Config
diff --git a/service/static/service_test.go b/service/static/service_test.go
index af616418..d69b2fdd 100644
--- a/service/static/service_test.go
+++ b/service/static/service_test.go
@@ -60,7 +60,7 @@ func Test_Files(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -113,7 +113,7 @@ func Test_Files_Disable(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -150,7 +150,7 @@ func Test_Files_Error(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -180,7 +180,7 @@ func Test_Files_Error2(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -210,7 +210,7 @@ func Test_Files_Forbid(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -247,7 +247,7 @@ func Test_Files_Always(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -284,7 +284,7 @@ func Test_Files_NotFound(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -321,7 +321,7 @@ func Test_Files_Dir(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -358,7 +358,7 @@ func Test_Files_NotForbid(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/watcher/config.go b/service/watcher/config.go
new file mode 100644
index 00000000..74be517a
--- /dev/null
+++ b/service/watcher/config.go
@@ -0,0 +1,48 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Configures set of Services.
+type Config struct {
+ // Interval defines the update duration for underlying watchers, default 1s.
+ Interval time.Duration
+
+ // Services declares list of services to be watched.
+ Services map[string]*watcherConfig
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ // Always use second based definition for time durations
+ if c.Interval < time.Microsecond {
+ c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds())
+ }
+
+ return nil
+}
+
+// InitDefaults sets missing values to their default values.
+func (c *Config) InitDefaults() error {
+ c.Interval = time.Second
+
+ return nil
+}
+
+// Watchers returns list of defined Services
+func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) {
+ watchers = make(map[string]roadrunner.Watcher)
+
+ for name, cfg := range c.Services {
+ watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg}
+ }
+
+ return watchers
+}
diff --git a/service/watcher/service.go b/service/watcher/service.go
new file mode 100644
index 00000000..c81ff3f5
--- /dev/null
+++ b/service/watcher/service.go
@@ -0,0 +1,46 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+)
+
+// ID defines watcher service name.
+const ID = "watch"
+
+// Watchable defines the ability to attach roadrunner watcher.
+type Watchable interface {
+ // Watch attaches watcher to the service.
+ Watch(w roadrunner.Watcher)
+}
+
+// Services to watch the state of roadrunner service inside other services.
+type Service struct {
+ cfg *Config
+ lsns []func(event int, ctx interface{})
+}
+
+// Init watcher service
+func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
+ // mount Services to designated services
+ for id, watcher := range cfg.Watchers(s.throw) {
+ svc, _ := c.Get(id)
+ if watchable, ok := svc.(Watchable); ok {
+ watchable.Watch(watcher)
+ }
+ }
+
+ return true, nil
+}
+
+// AddListener attaches server event watcher.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+}
diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go
new file mode 100644
index 00000000..3090d15d
--- /dev/null
+++ b/service/watcher/state_watch.go
@@ -0,0 +1,58 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateWatcher struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateWatcher() *stateWatcher {
+ return &stateWatcher{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateWatcher) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateWatcher) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
new file mode 100644
index 00000000..08d477fa
--- /dev/null
+++ b/service/watcher/watcher.go
@@ -0,0 +1,153 @@
+package watcher
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
+ "time"
+)
+
+const (
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory = iota + 8000
+
+ // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError
+ EventMaxTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time at rest.
+ EventMaxIdleTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventMaxExecTTL
+)
+
+// handles watcher events
+type listener func(event int, ctx interface{})
+
+// defines the watcher behaviour
+type watcherConfig struct {
+ // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
+ MaxMemory uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // MaxIdleTTL defines maximum duration worker can spend in idle mode.
+ MaxIdleTTL int64
+
+ // MaxExecTTL defines maximum lifetime per job.
+ MaxExecTTL int64
+}
+
+type watcher struct {
+ lsn listener
+ tick time.Duration
+ cfg *watcherConfig
+
+ // list of workers which are currently working
+ sw *stateWatcher
+
+ stop chan interface{}
+}
+
+// watch the pool state
+func (wch *watcher) watch(p roadrunner.Pool) {
+ now := time.Now()
+
+ for _, w := range p.Workers() {
+ if w.State().Value() == roadrunner.StateInvalid {
+ // skip duplicate assessment
+ continue
+ }
+
+ s, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxTTL, w, err)
+ }
+ continue
+ }
+
+ if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ wch.report(EventMaxMemory, w, err)
+ }
+ continue
+ }
+
+ // watch the worker state changes
+ wch.sw.push(w)
+ }
+
+ wch.sw.sync(now)
+
+ if wch.cfg.MaxExecTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ ) {
+ err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+ if p.Remove(w, err) {
+ // brutally
+ go w.Kill()
+ wch.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if wch.cfg.MaxIdleTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxIdleTTL, w, err)
+ }
+ }
+ }
+}
+
+// throw watcher event
+func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
+ if wch.lsn != nil {
+ wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+ }
+}
+
+// Attach watcher to the pool
+func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
+ wp := &watcher{
+ tick: wch.tick,
+ lsn: wch.lsn,
+ cfg: wch.cfg,
+ sw: newStateWatcher(),
+ stop: make(chan interface{}),
+ }
+
+ go func(wp *watcher, pool roadrunner.Pool) {
+ ticker := time.NewTicker(wp.tick)
+ for {
+ select {
+ case <-ticker.C:
+ wp.watch(pool)
+ case <-wp.stop:
+ return
+ }
+ }
+ }(wp, pool)
+
+ return wp
+}
+
+// Detach watcher from the pool.
+func (wch *watcher) Detach() {
+ close(wch.stop)
+}
diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php
index 4d427121..cb534577 100644
--- a/src/Diactoros/ServerRequestFactory.php
+++ b/src/Diactoros/ServerRequestFactory.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner\Diactoros;
diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php
index 6004ef11..1c51d911 100644
--- a/src/Diactoros/StreamFactory.php
+++ b/src/Diactoros/StreamFactory.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner\Diactoros;
diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php
index 1543a826..7de9a30f 100644
--- a/src/Diactoros/UploadedFileFactory.php
+++ b/src/Diactoros/UploadedFileFactory.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner\Diactoros;
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php
index 7c5c5929..348cb106 100644
--- a/src/Exception/RoadRunnerException.php
+++ b/src/Exception/RoadRunnerException.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner\Exception;
diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php
index 1a5da18c..803e4e31 100644
--- a/src/Exceptions/RoadRunnerException.php
+++ b/src/Exceptions/RoadRunnerException.php
@@ -5,6 +5,7 @@
* @license MIT
* @author Anton Titov (Wolfy-J)
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner\Exceptions;
diff --git a/src/HttpClient.php b/src/HttpClient.php
index e469dd30..f31a9b50 100644
--- a/src/HttpClient.php
+++ b/src/HttpClient.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Alex Bond
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner;
diff --git a/src/PSR7Client.php b/src/PSR7Client.php
index 8229b7d5..5b9425d6 100644
--- a/src/PSR7Client.php
+++ b/src/PSR7Client.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner;
diff --git a/src/Worker.php b/src/Worker.php
index da80e461..b67ebd3b 100644
--- a/src/Worker.php
+++ b/src/Worker.php
@@ -1,11 +1,10 @@
<?php
-declare(strict_types=1);
-
/**
* High-performance PHP process supervisor and load balancer written in Go
*
* @author Wolfy-J
*/
+declare(strict_types=1);
namespace Spiral\RoadRunner;
diff --git a/src/bin/roadrunner b/src/bin/roadrunner
new file mode 100755
index 00000000..cf4317d7
--- /dev/null
+++ b/src/bin/roadrunner
@@ -0,0 +1,207 @@
+#!/usr/bin/env php
+<?php
+/**
+ * RoadRunner
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * This file responsive for cli commands
+ */
+declare(strict_types=1);
+
+foreach ([
+ __DIR__ . '/../../../../autoload.php',
+ __DIR__ . '/../../vendor/autoload.php',
+ __DIR__ . '/vendor/autoload.php'
+ ] as $file) {
+ if (file_exists($file)) {
+ define('RR_COMPOSER_INSTALL', $file);
+
+ break;
+ }
+}
+
+unset($file);
+
+if (!defined('RR_COMPOSER_INSTALL')) {
+ fwrite(
+ STDERR,
+ 'You need to set up the project dependencies using Composer:' . PHP_EOL . PHP_EOL .
+ ' composer install' . PHP_EOL . PHP_EOL .
+ 'You can learn all about Composer on https://getcomposer.org/.' . PHP_EOL
+ );
+
+ die(1);
+}
+
+if (!class_exists('ZipArchive')) {
+ fwrite(STDERR, 'Extension `php-zip` is required.' . PHP_EOL);
+ die(1);
+}
+
+if (!function_exists('curl_init')) {
+ fwrite(STDERR, 'Extension `php-curl` is required.' . PHP_EOL);
+ die(1);
+}
+
+require RR_COMPOSER_INSTALL;
+
+use Symfony\Component\Console\Application;
+use Symfony\Component\Console\Helper\ProgressBar;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ConfirmationQuestion;
+
+class RoadRunnerCLIHelper
+{
+ /**
+ * Returns version of RoadRunner based on build.sh file
+ *
+ * @return string Version of RoadRunner
+ * @throws Exception
+ */
+ public static function getVersion(): string
+ {
+ $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh';
+ $fileResource = fopen($file, 'r') or die(1);
+ while (!feof($fileResource)) {
+ $line = fgets($fileResource, 4096);
+ $matches = [];
+ if (preg_match("/^RR_VERSION=(.*)/", $line, $matches)) {
+ return $matches[1];
+ }
+ }
+ fclose($fileResource);
+ throw new Exception("Can't find version of RoadRunner");
+ }
+
+ /**
+ * Returns OS Type for filename
+ *
+ * @return string OS Type
+ */
+ public static function getOSType(): string
+ {
+ switch (PHP_OS) {
+ case 'Darwin':
+ return 'darwin';
+ case 'Linux':
+ return 'linux';
+ case 'FreeBSD':
+ return 'freebsd';
+ case 'WIN32':
+ case 'WINNT':
+ case 'Windows':
+ return 'windows';
+ default:
+ return 'linux';
+ }
+ }
+
+ /**
+ * Returns generated URL to zip file on GitHub with binary file
+ *
+ * @return string URL
+ * @throws Exception
+ */
+ public static function getBinaryDownloadUrl()
+ {
+ return 'https://github.com/spiral/roadrunner/releases/download/v' . static::getVersion() . '/roadrunner-' . static::getVersion() . '-' . static::getOSType() . '-amd64.zip';
+ }
+}
+
+(new Application('RoadRunner', RoadRunnerCLIHelper::getVersion()))
+ ->register('update-binaries')
+ ->setDescription("Install or update RoadRunner binaries in specified folder (current folder by default)")
+ ->addOption('location', 'l', InputArgument::OPTIONAL, 'destination folder', '.')
+ ->setCode(function (InputInterface $input, OutputInterface $output) {
+ $output->writeln('<info>Updating binary file of RoadRunner</info>');
+
+ $finalFile = $input->getOption('location') . DIRECTORY_SEPARATOR . 'rr';
+
+ if (is_file($finalFile)) {
+ $output->writeln('<error>RoadRunner binary file already exists!</error>');
+ $helper = $this->getHelper('question');
+ $question = new ConfirmationQuestion('Do you want overwrite it? [Y/n]');
+
+ if (!$helper->ask($input, $output, $question)) {
+ return;
+ }
+ }
+
+ $output->writeln('<info>Downloading RoadRunner archive for ' . RoadRunnerCLIHelper::getOSType() . '</info>');
+
+ $progressBar = new ProgressBar($output);
+ $progressBar->setFormat('verbose');
+ $progressBar->start();
+
+ $zipFileName = tempnam('.', "rr_zip");
+ $zipFile = fopen($zipFileName, "w+");
+ $curlResource = curl_init();
+ curl_setopt($curlResource, CURLOPT_URL, RoadRunnerCLIHelper::getBinaryDownloadUrl());
+ curl_setopt($curlResource, CURLOPT_RETURNTRANSFER, true);
+ curl_setopt($curlResource, CURLOPT_BINARYTRANSFER, true);
+ curl_setopt($curlResource, CURLOPT_FOLLOWLOCATION, true);
+ curl_setopt($curlResource, CURLOPT_FILE, $zipFile);
+ curl_setopt($curlResource, CURLOPT_PROGRESSFUNCTION,
+ function ($resource, $download_size, $downloaded, $upload_size, $uploaded) use ($progressBar) {
+ if ($download_size == 0) {
+ return;
+ }
+ $progressBar->setFormat('[%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% ' . intval($download_size / 1024) . 'KB');
+ $progressBar->setMaxSteps($download_size);
+ $progressBar->setProgress($downloaded);
+ });
+ curl_setopt($curlResource, CURLOPT_NOPROGRESS, false); // needed to make progress function work
+ curl_setopt($curlResource, CURLOPT_HEADER, 0);
+ curl_exec($curlResource);
+ curl_close($curlResource);
+ fclose($zipFile);
+
+ $progressBar->finish();
+ $output->writeln("");
+
+ $output->writeln('<info>Unpacking ' . basename(RoadRunnerCLIHelper::getBinaryDownloadUrl()) . '</info>');
+
+ $zipArchive = new ZipArchive();
+ $zipArchive->open($zipFileName);
+ $fileStreamFromZip = $zipArchive->getStream('roadrunner-' . RoadRunnerCLIHelper::getVersion() . '-' . RoadRunnerCLIHelper::getOSType() . '-amd64/rr');
+ $finalFileResource = fopen($finalFile, 'w');
+
+ if (!$fileStreamFromZip) {
+ throw new Exception('Unable to extract the file.');
+ }
+
+ while (!feof($fileStreamFromZip)) {
+ fwrite($finalFileResource, fread($fileStreamFromZip, 8192));
+ }
+
+ fclose($fileStreamFromZip);
+ fclose($finalFileResource);
+ $zipArchive->extractTo('.', []);
+ $zipArchive->close();
+ unlink($zipFileName);
+
+ chmod($finalFile, 0755);
+ $output->writeln('<info>Binary file updated!</info>');
+ })
+ ->getApplication()
+ ->register("init-config")
+ ->setDescription("Inits default .rr.yaml config in specified folder (current folder by default)")
+ ->addOption('location', 'l', InputArgument::OPTIONAL, 'destination folder', '.')
+ ->setCode(function (InputInterface $input, OutputInterface $output) {
+ if (is_file($input->getOption('location') . DIRECTORY_SEPARATOR . '.rr.yaml')) {
+ $output->writeln('<error>Config file already exists!</error>');
+ $helper = $this->getHelper('question');
+ $question = new ConfirmationQuestion('Do you want overwrite it? [Y/n]');
+
+ if (!$helper->ask($input, $output, $question)) {
+ return;
+ }
+ }
+ copy(__DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '.rr.yaml',
+ $input->getOption('location') . DIRECTORY_SEPARATOR . '.rr.yaml');
+ $output->writeln('<info>Config file created!</info>');
+ })
+ ->getApplication()
+ ->run();
diff --git a/state.go b/state.go
index 4d8b1eaa..bf88f012 100644
--- a/state.go
+++ b/state.go
@@ -26,8 +26,8 @@ const (
// StateWorking - working on given payload.
StateWorking
- // StateStreaming - indicates that worker is streaming the data at the moment.
- StateStreaming
+ // StateInvalid - indicates that worker is being disabled and will be removed.
+ StateInvalid
// StateStopping - process is being softly stopped.
StateStopping
@@ -57,8 +57,8 @@ func (s *state) String() string {
return "ready"
case StateWorking:
return "working"
- case StateStreaming:
- return "streaming"
+ case StateInvalid:
+ return "invalid"
case StateStopped:
return "stopped"
case StateErrored:
diff --git a/static_pool.go b/static_pool.go
index c9473699..02960825 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -41,6 +41,10 @@ type StaticPool struct {
// all registered workers
workers []*Worker
+ // invalid declares set of workers to be removed from the pool.
+ mur sync.Mutex
+ remove sync.Map
+
// pool is being destroyed
inDestroy int32
destroy chan interface{}
@@ -111,6 +115,21 @@ func (p *StaticPool) Workers() (workers []*Worker) {
return workers
}
+// Remove forces pool to remove specific worker.
+func (p *StaticPool) Remove(w *Worker, err error) bool {
+ if w.State().Value() != StateReady && w.State().Value() != StateWorking {
+ // unable to remove inactive worker
+ return false
+ }
+
+ if _, ok := p.remove.Load(w); ok {
+ return false
+ }
+
+ p.remove.Store(w, err)
+ return true
+}
+
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tmu.Lock()
@@ -133,13 +152,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return nil, err
}
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return p.Exec(rqs)
}
@@ -159,6 +178,7 @@ func (p *StaticPool) Destroy() {
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
+ w.markInvalid()
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w, nil)
@@ -181,6 +201,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
+
+ // get next worker
+ i++
+ continue
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")
@@ -199,8 +227,19 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
+
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
+
+ // get next worker
+ i++
+ continue
+ }
+
return w, nil
case <-p.destroy:
+ timeout.Stop()
+
return nil, fmt.Errorf("pool has been stopped")
}
}
@@ -211,7 +250,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// release releases or replaces the worker.
func (p *StaticPool) release(w *Worker) {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- go p.destroyWorker(w, p.cfg.MaxJobs)
+ p.discardWorker(w, p.cfg.MaxJobs)
+ return
+ }
+
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
return
}
@@ -242,6 +286,12 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return w, nil
}
+// gentry remove worker
+func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
+ w.markInvalid()
+ go p.destroyWorker(w, caused)
+}
+
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
go w.Stop()
@@ -271,6 +321,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ p.remove.Delete(w)
break
}
}
@@ -307,9 +358,8 @@ func (p *StaticPool) destroyed() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
p.mul.Lock()
- defer p.mul.Unlock()
-
if p.lsn != nil {
p.lsn(event, ctx)
}
+ p.mul.Unlock()
}
diff --git a/static_pool_test.go b/static_pool_test.go
index a78fcf11..a7e71fdb 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -381,10 +381,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
DestroyTimeout: time.Second,
},
)
- p.Destroy()
- assert.NotNil(t, p)
assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy()
}
func Benchmark_Pool_Allocate(b *testing.B) {
@@ -425,7 +426,11 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
p, _ := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- cfg,
+ Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
)
defer p.Destroy()
diff --git a/tests/pid.php b/tests/pid.php
index a8cfa229..bc1928a6 100644
--- a/tests/pid.php
+++ b/tests/pid.php
@@ -1,17 +1,17 @@
<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
+ /**
+ * @var Goridge\RelayInterface $relay
+ */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
+ use Spiral\Goridge;
+ use Spiral\RoadRunner;
-$rr = new RoadRunner\Worker($relay);
+ $rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
- try {
- $rr->send((string)getmypid());
- } catch (\Throwable $e) {
- $rr->error((string)$e);
- }
-} \ No newline at end of file
+ while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send((string)getmypid());
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+ } \ No newline at end of file
diff --git a/tests/slow-pid.php b/tests/slow-pid.php
new file mode 100644
index 00000000..daaf2583
--- /dev/null
+++ b/tests/slow-pid.php
@@ -0,0 +1,18 @@
+<?php
+ /**
+ * @var Goridge\RelayInterface $relay
+ */
+
+ use Spiral\Goridge;
+ use Spiral\RoadRunner;
+
+ $rr = new RoadRunner\Worker($relay);
+
+ while ($in = $rr->receive($ctx)) {
+ try {
+ sleep(1);
+ $rr->send((string)getmypid());
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+ } \ No newline at end of file
diff --git a/util/state_test.go b/util/state_test.go
index 2afe682e..d52bee1d 100644
--- a/util/state_test.go
+++ b/util/state_test.go
@@ -30,6 +30,16 @@ func TestServerState(t *testing.T) {
assert.Len(t, state, runtime.NumCPU())
}
+func TestDeadWorker(t *testing.T) {
+ w := &roadrunner.Worker{}
+ i := 0
+
+ w.Pid = &i
+
+ _, err := WorkerState(w)
+ assert.Error(t, err)
+}
+
func TestServerState_Err(t *testing.T) {
_, err := ServerState(nil)
assert.Error(t, err)
diff --git a/watcher.go b/watcher.go
new file mode 100644
index 00000000..4527fe68
--- /dev/null
+++ b/watcher.go
@@ -0,0 +1,10 @@
+package roadrunner
+
+// Watcher observes pool state and decides if any worker must be destroyed.
+type Watcher interface {
+ // Lock watcher on given pool instance.
+ Attach(p Pool) Watcher
+
+ // Detach pool watching.
+ Detach()
+}
diff --git a/watcher_test.go b/watcher_test.go
new file mode 100644
index 00000000..94ac591e
--- /dev/null
+++ b/watcher_test.go
@@ -0,0 +1,216 @@
+package roadrunner
+
+import (
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+ "time"
+)
+
+type eWatcher struct {
+ p Pool
+ onAttach func(p Pool)
+ onDetach func(p Pool)
+}
+
+func (w *eWatcher) Attach(p Pool) Watcher {
+ wp := &eWatcher{p: p, onAttach: w.onAttach, onDetach: w.onDetach}
+
+ if wp.onAttach != nil {
+ wp.onAttach(p)
+ }
+
+ return wp
+}
+
+func (w *eWatcher) Detach() {
+ if w.onDetach != nil {
+ w.onDetach(w.p)
+ }
+}
+
+func (w *eWatcher) remove(wr *Worker, err error) {
+ w.p.Remove(wr, err)
+}
+
+func Test_WatcherWatch(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pWatcher)
+ assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_WatcherReattach(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pWatcher)
+ assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+
+ oldWatcher := rr.pWatcher
+
+ assert.NoError(t, rr.Reset())
+
+ assert.NotNil(t, rr.pWatcher)
+ assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+ assert.NotEqual(t, oldWatcher, rr.pWatcher)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_WatcherAttachDetachSequence(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ var attachedPool Pool
+
+ rr.Watch(&eWatcher{
+ onAttach: func(p Pool) {
+ attachedPool = p
+ },
+ onDetach: func(p Pool) {
+ assert.Equal(t, attachedPool, p)
+ },
+ })
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pWatcher)
+ assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_RemoveWorkerOnAllocation(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid := res.String()
+
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ res, err = rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.NotEqual(t, lastPid, res.String())
+
+ assert.NotEqual(t, StateReady, wr.state.Value())
+
+ _, ok := rr.pool.(*StaticPool).remove.Load(wr)
+ assert.False(t, ok)
+}
+
+func Test_RemoveWorkerAfterTask(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php slow-pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+ lastPid := ""
+
+ wait := make(chan interface{})
+ go func() {
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid = res.String()
+
+ close(wait)
+ }()
+
+ // wait for worker execution to be in progress
+ time.Sleep(time.Millisecond * 250)
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ <-wait
+
+ // must be replaced
+ assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
+
+ // must not be registered withing pool
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+}
diff --git a/worker.go b/worker.go
index c52960b2..88fd1b9b 100644
--- a/worker.go
+++ b/worker.go
@@ -127,7 +127,7 @@ func (w *Worker) Wait() error {
return &exec.ExitError{ProcessState: w.endState}
}
-// Stop sends soft termination command to the worker and waits for process completion.
+// Detach sends soft termination command to the worker and waits for process completion.
func (w *Worker) Stop() error {
select {
case <-w.waitDone:
@@ -137,7 +137,7 @@ func (w *Worker) Stop() error {
defer w.mu.Unlock()
w.state.set(StateStopping)
- err := sendPayload(w.rl, &stopCommand{Stop: true})
+ err := sendControl(w.rl, &stopCommand{Stop: true})
<-w.waitDone
return err
@@ -164,34 +164,39 @@ func (w *Worker) Kill() error {
// errors. Method might return JobError indicating issue with payload.
func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
w.mu.Lock()
- defer w.mu.Unlock()
if rqs == nil {
+ w.mu.Unlock()
return nil, fmt.Errorf("payload can not be empty")
}
if w.state.Value() != StateReady {
+ w.mu.Unlock()
return nil, fmt.Errorf("worker is not ready (%s)", w.state.String())
}
w.state.set(StateWorking)
- defer w.state.registerExec()
rsp, err = w.execPayload(rqs)
if err != nil {
if _, ok := err.(JobError); !ok {
w.state.set(StateErrored)
+ w.state.registerExec()
+ w.mu.Unlock()
return nil, err
}
}
- // todo: attach when payload is complete
- // todo: new status
-
w.state.set(StateReady)
+ w.state.registerExec()
+ w.mu.Unlock()
return rsp, err
}
+func (w *Worker) markInvalid() {
+ w.state.set(StateInvalid)
+}
+
func (w *Worker) start() error {
if err := w.cmd.Start(); err != nil {
close(w.waitDone)
@@ -220,7 +225,8 @@ func (w *Worker) start() error {
}
func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
- if err := sendPayload(w.rl, rqs.Context); err != nil {
+ // two things
+ if err := sendControl(w.rl, rqs.Context); err != nil {
return nil, errors.Wrap(err, "header error")
}