diff options
62 files changed, 1287 insertions, 532 deletions
diff --git a/.travis.yml b/.travis.yml index 53c3b379..58138aa7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: go sudo: required go: - - "1.11.x" + - "1.12.x" install: - export GO111MODULE=on @@ -13,7 +13,9 @@ test: go test -v -race -cover go test -v -race -cover ./util go test -v -race -cover ./service + go test -v -race -cover ./service/util go test -v -race -cover ./service/env go test -v -race -cover ./service/rpc go test -v -race -cover ./service/http go test -v -race -cover ./service/static + go test -v -race -cover ./service/watcher @@ -3,7 +3,7 @@ cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" # Pushes application version into the build information. -RR_VERSION=1.3.7 +RR_VERSION=1.4.0 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index f50ff0e9..9cdde098 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -29,7 +29,7 @@ http: key: server.key # max POST request size, including file uploads in MB. - maxRequest: 200 + maxRequestSize: 200 # file upload configuration. uploads: @@ -58,6 +58,27 @@ http: # amount of time given to worker to gracefully destruct itself. destroyTimeout: 60 +# monitors roadrunner server(s) +watch: + # check worker state each second + interval: 1 + + # custom watch configuration for each service + services: + # monitor http workers + http: + # maximum allowed memory consumption + maxMemory: 100 + + # maximum time to live for the worker + maxTTL: 0 + + # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections) + maxIdleTTL: 0 + + # max_execution_time, worker will be killed if that amount of time spend for task execution + maxExecTTL: 60 + # static file serving. remove this section to disable static file serving. static: # root directory for static file (http would not serve .php and .htaccess files). diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index 74506004..1222177a 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -25,10 +25,11 @@ import ( "github.com/spf13/cobra" "github.com/spiral/roadrunner/cmd/util" "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/watcher" "os" ) -// Service bus for all the commands. +// Services bus for all the commands. var ( cfgFile, workDir, logFormat string override []string @@ -106,6 +107,16 @@ func init() { util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) os.Exit(1) } + + // global watcher config + if Verbose { + wcv, _ := Container.Get(watcher.ID) + if wcv, ok := wcv.(*watcher.Service); ok { + wcv.AddListener(func(event int, ctx interface{}) { + util.LogEvent(Logger, event, ctx) + }) + } + } }) } diff --git a/cmd/rr/cmd/stop.go b/cmd/rr/cmd/stop.go new file mode 100644 index 00000000..2f48615b --- /dev/null +++ b/cmd/rr/cmd/stop.go @@ -0,0 +1,52 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/spiral/roadrunner/cmd/util" +) + +func init() { + CLI.AddCommand(&cobra.Command{ + Use: "stop", + Short: "Detach RoadRunner server", + RunE: stopHandler, + }) +} + +func stopHandler(cmd *cobra.Command, args []string) error { + client, err := util.RPCClient(Container) + if err != nil { + return err + } + defer client.Close() + + util.Printf("<green>Stopping RoadRunner</reset>: ") + + var r string + if err := client.Call("system.Detach", true, &r); err != nil { + return err + } + + util.Printf("<green+hb>done</reset>\n") + return nil +} diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go index 42fd966d..3008848a 100644 --- a/cmd/rr/http/reset.go +++ b/cmd/rr/http/reset.go @@ -41,7 +41,7 @@ func reloadHandler(cmd *cobra.Command, args []string) error { } defer client.Close() - util.Printf("<green>restarting http worker pool</reset>: ") + util.Printf("<green>Restarting http worker pool</reset>: ") var r string if err := client.Call("http.Reset", true, &r); err != nil { diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 54915957..dc2fbc20 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -24,6 +24,7 @@ package main import ( rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service/watcher" // services (plugins) "github.com/spiral/roadrunner/service/env" @@ -40,6 +41,7 @@ func main() { rr.Container.Register(rpc.ID, &rpc.Service{}) rr.Container.Register(http.ID, &http.Service{}) rr.Container.Register(static.ID, &static.Service{}) + rr.Container.Register(watcher.ID, &watcher.Service{}) // you can register additional commands using cmd.CLI rr.Execute() diff --git a/cmd/util/config.go b/cmd/util/config.go index a354b132..d5b1020c 100644 --- a/cmd/util/config.go +++ b/cmd/util/config.go @@ -63,6 +63,8 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*co // read in environment variables that match cfg.AutomaticEnv() + cfg.SetEnvPrefix("rr") + cfg.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // If a cfg file is found, read it in. if err := cfg.ReadInConfig(); err != nil { @@ -71,6 +73,13 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*co } } + // automatically inject ENV variables using ${ENV} pattern + for _, key := range cfg.AllKeys() { + val := cfg.Get(key) + cfg.Set(key, parseEnv(val)) + } + + // merge with console flags if len(flags) != 0 { for _, f := range flags { k, v, err := parseFlag(f) @@ -114,3 +123,18 @@ func parseValue(value string) string { return value } + +func parseEnv(value interface{}) interface{} { + str, ok := value.(string) + if !ok || len(str) <= 3 { + return value + } + + if str[0:2] == "${" && str[len(str)-1:] == "}" { + if v, ok := os.LookupEnv(str[2 : len(str)-1]); ok { + return v + } + } + + return str +} diff --git a/cmd/util/debug.go b/cmd/util/debug.go index f64b9bc4..c120eba2 100644 --- a/cmd/util/debug.go +++ b/cmd/util/debug.go @@ -3,6 +3,7 @@ package util import ( "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/watcher" "strings" ) @@ -12,7 +13,7 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { case roadrunner.EventWorkerKill: w := ctx.(*roadrunner.Worker) logger.Warning(Sprintf( - "<white+hb>worker.%v</reset> <yellow>killed</red>", + "<white+hb>worker.%v</reset> <yellow>killed</reset>", *w.Pid, )) return true @@ -57,5 +58,44 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { return true } + // watchers + switch event { + case watcher.EventMaxTTL: + w := ctx.(roadrunner.WorkerError) + logger.Debug(Sprintf( + "<white+hb>worker.%v</reset> <yellow>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + + case watcher.EventMaxIdleTTL: + w := ctx.(roadrunner.WorkerError) + logger.Debug(Sprintf( + "<white+hb>worker.%v</reset> <yellow>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + + case watcher.EventMaxMemory: + w := ctx.(roadrunner.WorkerError) + logger.Error(Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + + case watcher.EventMaxExecTTL: + w := ctx.(roadrunner.WorkerError) + logger.Error(Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + } + return false } diff --git a/cmd/util/table.go b/cmd/util/table.go index 565c0679..c0e20837 100644 --- a/cmd/util/table.go +++ b/cmd/util/table.go @@ -40,6 +40,8 @@ func renderStatus(status string) string { return Sprintf("<cyan>ready</reset>") case "working": return Sprintf("<green>working</reset>") + case "invalid": + return Sprintf("<yellow>invalid</reset>") case "stopped": return Sprintf("<red>stopped</reset>") case "errored": diff --git a/composer.json b/composer.json index cb08ee97..9e2e78fa 100644 --- a/composer.json +++ b/composer.json @@ -7,22 +7,26 @@ { "name": "Anton Titov / Wolfy-J", "email": "[email protected]" + }, + { + "name": "RoadRunner Community", + "homepage": "https://github.com/spiral/roadrunner/graphs/contributors" } ], "require": { "php": "^7.0", - "ext-json": "*", "spiral/goridge": "^2.0", "psr/http-factory": "^1.0", "psr/http-message": "^1.0", - "zendframework/zend-diactoros": "^1.3|^2.0" + "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0", + "http-interop/http-factory-diactoros": "^1.0" }, - "bin": [ - "qbuild/rr-build" - ], "autoload": { "psr-4": { "Spiral\\RoadRunner\\": "src/" } - } + }, + "bin": [ + "src/bin/roadrunner" + ] } diff --git a/error_buffer.go b/error_buffer.go index fec789a9..0eaf03b6 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -73,9 +73,8 @@ func newErrBuffer() *errBuffer { // Listen attaches error stream even listener. func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { eb.mu.Lock() - defer eb.mu.Unlock() - eb.lsn = l + eb.mu.Unlock() } // Len returns the number of buf of the unread portion of the errBuffer; @@ -88,14 +87,13 @@ func (eb *errBuffer) Len() int { return len(eb.buf) } -// Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; err is always nil. func (eb *errBuffer) Write(p []byte) (int, error) { eb.mu.Lock() - defer eb.mu.Unlock() - eb.buf = append(eb.buf, p...) eb.update <- nil + eb.mu.Unlock() return len(p), nil } @@ -31,6 +31,9 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*Worker) + // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker. + Remove(w *Worker, err error) bool + // Destroy all underlying workers (but let them to complete the task). Destroy() } diff --git a/protocol.go b/protocol.go index 564df9b7..5523a3e5 100644 --- a/protocol.go +++ b/protocol.go @@ -15,7 +15,7 @@ type pidCommand struct { Pid int `json:"pid"` } -func sendPayload(rl goridge.Relay, v interface{}) error { +func sendControl(rl goridge.Relay, v interface{}) error { if data, ok := v.([]byte); ok { return rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw) } @@ -29,7 +29,7 @@ func sendPayload(rl goridge.Relay, v interface{}) error { } func fetchPID(rl goridge.Relay) (pid int, err error) { - if err := sendPayload(rl, pidCommand{Pid: os.Getpid()}); err != nil { + if err := sendControl(rl, pidCommand{Pid: os.Getpid()}); err != nil { return 0, err } diff --git a/protocol_test.go b/protocol_test.go index ed3fe461..526945cb 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -29,7 +29,7 @@ func (r *relayMock) Close() error { } func Test_Protocol_Errors(t *testing.T) { - err := sendPayload(&relayMock{}, make(chan int)) + err := sendControl(&relayMock{}, make(chan int)) assert.Error(t, err) } diff --git a/qbuild/.build.json b/qbuild/.build.json deleted file mode 100644 index 74b83cea..00000000 --- a/qbuild/.build.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "packages": [ - "github.com/spiral/roadrunner/service/env", - "github.com/spiral/roadrunner/service/http", - "github.com/spiral/roadrunner/service/rpc", - "github.com/spiral/roadrunner/service/static" - ], - "commands": [ - "github.com/spiral/roadrunner/cmd/rr/http" - ], - "register": [ - "rr.Container.Register(env.ID, &env.Service{})", - "rr.Container.Register(rpc.ID, &rpc.Service{})", - "rr.Container.Register(http.ID, &http.Service{})", - "rr.Container.Register(static.ID, &static.Service{})" - ] -}
\ No newline at end of file diff --git a/qbuild/docker/Dockerfile b/qbuild/docker/Dockerfile deleted file mode 100644 index 11fb5abb..00000000 --- a/qbuild/docker/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM golang:latest - -ENV CGO_ENABLED=0 -ENV GO111MODULE=on - -WORKDIR /go/src/rr - -COPY compile.sh /go/src/rr/
\ No newline at end of file diff --git a/qbuild/docker/compile.sh b/qbuild/docker/compile.sh deleted file mode 100644 index 0c85124f..00000000 --- a/qbuild/docker/compile.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash -LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" -LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.BuildTime=$(date +%FT%T%z)" - -# Verify all external modules -go mod init - -# Build the binary -CGO_ENABLED=0 go build -v -ldflags "$LDFLAGS -extldflags '-static'" -o "rr"
\ No newline at end of file diff --git a/qbuild/main.go b/qbuild/main.go deleted file mode 100644 index a065fe27..00000000 --- a/qbuild/main.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - "github.com/sirupsen/logrus" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - // -packages- // - // -commands- // -) - -func main() { - // -register- // - rr.Logger.Formatter = &logrus.TextFormatter{ForceColors: true} - rr.Execute() -} diff --git a/qbuild/rr-build b/qbuild/rr-build deleted file mode 100644 index 7918f4f8..00000000 --- a/qbuild/rr-build +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env php -<?php -/** - * Automatic roadrunner builds. - */ - -use Spiral\RoadRunner\QuickBuild\Builder; - -require_once "src/Builder.php"; - -// load build config -$version = $argv[1] ?? "1.3.1"; - -// load build config -$config = $argv[2] ?? __DIR__ . "/.build.json"; - -// Greeting! -Builder::cprintf( - "Building <green>RoadRunner</reset> specifically for you (version: <white>%s</reset>)...\n", - $version -); - -$builder = Builder::loadConfig($config); -if ($builder == null) { - Builder::cprintf("<red>Unable to load config:</reset> %s\n", $config); - return; -} - -$errors = $builder->configErrors(); -if (!empty($errors)) { - Builder::cprintf("<yellow>Found configuration errors:</reset>\n"); - foreach ($errors as $error) { - Builder::cprintf("- <red>%s</reset>\n", $error); - } - - return; -} - -// Start build -$builder->build(getcwd(), __DIR__ . '/main.go', 'rr', $version);
\ No newline at end of file diff --git a/qbuild/src/Builder.php b/qbuild/src/Builder.php deleted file mode 100644 index 4b441094..00000000 --- a/qbuild/src/Builder.php +++ /dev/null @@ -1,237 +0,0 @@ -<?php -declare(strict_types=1); -/** - * RoadRunner. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ - -namespace Spiral\RoadRunner\QuickBuild; - -final class Builder -{ - const DOCKER = 'spiralscout/rr-build'; - - /** - * Coloring. - * - * @var array - */ - protected static $colors = [ - "reset" => "\e[0m", - "white" => "\033[1;38m", - "red" => "\033[0;31m", - "green" => "\033[0;32m", - "yellow" => "\033[1;93m", - "gray" => "\033[0;90m" - ]; - - /** @var array */ - private $config; - - /** - * @param array $config - */ - protected function __construct(array $config) - { - $this->config = $config; - } - - /** - * Validate the build configuration. - * - * @return array - */ - public function configErrors(): array - { - $errors = []; - if (!isset($this->config["commands"])) { - $errors[] = "Directive 'commands' missing"; - } - - if (!isset($this->config["packages"])) { - $errors[] = "Directive 'packages' missing"; - } - - if (!isset($this->config["register"])) { - $errors[] = "Directive 'register' missing"; - } - - return $errors; - } - - /** - * Build the application. - * - * @param string $directory - * @param string $template - * @param string $output - * @param string $version - */ - public function build(string $directory, string $template, string $output, string $version) - { - $filename = $directory . "/main.go"; - $output = $output . ($this->getOS() == 'windows' ? '.exe' : ''); - - // step 1, generate template - $this->generate($template, $filename); - - $command = sprintf( - 'docker run --rm -v "%s":/mnt -e RR_VERSION=%s -e GOARCH=amd64 -e GOOS=%s %s /bin/bash -c "mv /mnt/main.go main.go; bash compile.sh; cp rr /mnt/%s;"', - $directory, - $version, - $this->getOS(), - self::DOCKER, - $output - ); - - self::cprintf("<yellow>%s</reset>\n", $command); - - // run the build - $this->run($command, true); - - if (!file_exists($directory . '/' . $output)) { - self::cprintf("<red>Build has failed!</reset>"); - return; - } - - self::cprintf("<green>Build complete!</reset>\n"); - $this->run($directory . '/' . $output, false); - } - - /** - * @param string $command - * @param bool $shadow - */ - protected function run(string $command, bool $shadow = false) - { - $shadow && self::cprintf("<gray>"); - passthru($command); - $shadow && self::cprintf("</reset>"); - } - - /** - * @param string $template - * @param string $filename - */ - protected function generate(string $template, string $filename) - { - $body = file_get_contents($template); - - $replace = [ - '// -packages- //' => '"' . join("\"\n\"", $this->config['packages']) . '"', - '// -commands- //' => '_ "' . join("\"\n_ \"", $this->config['commands']) . '"', - '// -register- //' => join("\n", $this->config['register']) - ]; - - // compile the template - $result = str_replace(array_keys($replace), array_values($replace), $body); - file_put_contents($filename, $result); - } - - /** - * @return string - */ - protected function getOS(): string - { - $os = strtolower(PHP_OS); - - if (strpos($os, 'darwin') !== false) { - return 'darwin'; - } - - if (strpos($os, 'win') !== false) { - return 'windows'; - } - - return "linux"; - } - - /** - * Create new builder using given config. - * - * @param string $config - * @return Builder|null - */ - public static function loadConfig(string $config): ?Builder - { - if (!file_exists($config)) { - return null; - } - - $configData = json_decode(file_get_contents($config), true); - if (!is_array($configData)) { - return null; - } - - return new Builder($configData); - } - - /** - * Make colored output. - * - * @param string $format - * @param mixed ...$args - */ - public static function cprintf(string $format, ...$args) - { - if (self::isColorsSupported()) { - $format = preg_replace_callback("/<\/?([^>]+)>/", function ($value) { - return self::$colors[$value[1]]; - }, $format); - } else { - $format = preg_replace("/<[^>]+>/", "", $format); - } - - echo sprintf($format, ...$args); - } - - /** - * @return bool - */ - public static function isWindows(): bool - { - return \DIRECTORY_SEPARATOR === '\\'; - } - - /** - * Returns true if the STDOUT supports colorization. - * - * @codeCoverageIgnore - * @link https://github.com/symfony/Console/blob/master/Output/StreamOutput.php#L94 - * @param mixed $stream - * @return bool - */ - public static function isColorsSupported($stream = STDOUT): bool - { - if ('Hyper' === getenv('TERM_PROGRAM')) { - return true; - } - - try { - if (\DIRECTORY_SEPARATOR === '\\') { - return ( - function_exists('sapi_windows_vt100_support') - && @sapi_windows_vt100_support($stream) - ) || getenv('ANSICON') !== false - || getenv('ConEmuANSI') == 'ON' - || getenv('TERM') == 'xterm'; - } - - if (\function_exists('stream_isatty')) { - return (bool)@stream_isatty($stream); - } - - if (\function_exists('posix_isatty')) { - return (bool)@posix_isatty($stream); - } - - $stat = @fstat($stream); - // Check if formatted mode is S_IFCHR - return $stat ? 0020000 === ($stat['mode'] & 0170000) : false; - } catch (\Throwable $e) { - return false; - } - } -} @@ -37,9 +37,13 @@ type Server struct { // creates and connects to workers factory Factory + // associated pool watcher + watcher Watcher + // currently active pool instance - mup sync.Mutex - pool Pool + mup sync.Mutex + pool Pool + pWatcher Watcher // observes pool events (can be attached to multiple pools at the same time) mul sync.Mutex @@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) { s.lsn = l } +// Listen attaches server event watcher. +func (s *Server) Watch(w Watcher) { + s.mu.Lock() + defer s.mu.Unlock() + + s.watcher = w + + s.mul.Lock() + if s.pWatcher != nil && s.pool != nil { + s.pWatcher.Detach() + s.pWatcher = s.watcher.Attach(s.pool) + } + s.mul.Unlock() +} + // Start underlying worker pool, configure factory and command provider. func (s *Server) Start() (err error) { s.mu.Lock() @@ -72,6 +91,10 @@ func (s *Server) Start() (err error) { return err } + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(s.pool) + } + s.pool.Listen(s.poolListener) s.started = true s.throw(EventServerStart, s) @@ -79,7 +102,7 @@ func (s *Server) Start() (err error) { return nil } -// Stop underlying worker pool and close the factory. +// Detach underlying worker pool and close the factory. func (s *Server) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +112,12 @@ func (s *Server) Stop() { } s.throw(EventPoolDestruct, s.pool) + + if s.pWatcher != nil { + s.pWatcher.Detach() + s.pWatcher = nil + } + s.pool.Destroy() s.factory.Close() @@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() previous := s.pool + pWatcher := s.pWatcher s.mu.Unlock() pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) @@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() s.cfg.Pool, s.pool = cfg.Pool, pool + + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(pool) + } + s.mu.Unlock() s.throw(EventPoolConstruct, pool) if previous != nil { - go func(previous Pool) { + go func(previous Pool, pWatcher Watcher) { s.throw(EventPoolDestruct, previous) + if pWatcher != nil { + pWatcher.Detach() + } + previous.Destroy() - }(previous) + }(previous, pWatcher) } return nil @@ -203,9 +242,8 @@ func (s *Server) poolListener(event int, ctx interface{}) { // throw invokes event handler if any. func (s *Server) throw(event int, ctx interface{}) { s.mul.Lock() - defer s.mul.Unlock() - if s.lsn != nil { s.lsn(event, ctx) } + s.mul.Unlock() } diff --git a/server_test.go b/server_test.go index 4646ebc9..c973d634 100644 --- a/server_test.go +++ b/server_test.go @@ -9,7 +9,7 @@ import ( ) func TestServer_PipesEcho(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -19,11 +19,11 @@ func TestServer_PipesEcho(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) + assert.NoError(t, rr.Start()) - res, err := srv.Exec(&Payload{Body: []byte("hello")}) + res, err := rr.Exec(&Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -33,8 +33,27 @@ func TestServer_PipesEcho(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func TestServer_NoPool(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + func TestServer_SocketEcho(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo tcp", Relay: "tcp://:9007", @@ -45,11 +64,11 @@ func TestServer_SocketEcho(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) + assert.NoError(t, rr.Start()) - res, err := srv.Exec(&Payload{Body: []byte("hello")}) + res, err := rr.Exec(&Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -60,7 +79,7 @@ func TestServer_SocketEcho(t *testing.T) { } func TestServer_Configure_BeforeStart(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -70,9 +89,9 @@ func TestServer_Configure_BeforeStart(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - err := srv.Reconfigure(&ServerConfig{ + err := rr.Reconfigure(&ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", Pool: &Config{ @@ -83,9 +102,9 @@ func TestServer_Configure_BeforeStart(t *testing.T) { }) assert.NoError(t, err) - assert.NoError(t, srv.Start()) + assert.NoError(t, rr.Start()) - res, err := srv.Exec(&Payload{Body: []byte("hello")}) + res, err := rr.Exec(&Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -93,11 +112,11 @@ func TestServer_Configure_BeforeStart(t *testing.T) { assert.Nil(t, res.Context) assert.Equal(t, "hello", res.String()) - assert.Len(t, srv.Workers(), 2) + assert.Len(t, rr.Workers(), 2) } func TestServer_Stop_NotStarted(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -108,12 +127,12 @@ func TestServer_Stop_NotStarted(t *testing.T) { }, }) - srv.Stop() - assert.Nil(t, srv.Workers()) + rr.Stop() + assert.Nil(t, rr.Workers()) } func TestServer_Reconfigure(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -123,12 +142,12 @@ func TestServer_Reconfigure(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) - assert.Len(t, srv.Workers(), 1) + assert.NoError(t, rr.Start()) + assert.Len(t, rr.Workers(), 1) - err := srv.Reconfigure(&ServerConfig{ + err := rr.Reconfigure(&ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", Pool: &Config{ @@ -139,11 +158,11 @@ func TestServer_Reconfigure(t *testing.T) { }) assert.NoError(t, err) - assert.Len(t, srv.Workers(), 2) + assert.Len(t, rr.Workers(), 2) } func TestServer_Reset(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -153,19 +172,19 @@ func TestServer_Reset(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) - assert.Len(t, srv.Workers(), 1) + assert.NoError(t, rr.Start()) + assert.Len(t, rr.Workers(), 1) - pid := *srv.Workers()[0].Pid - assert.NoError(t, srv.Reset()) - assert.Len(t, srv.Workers(), 1) - assert.NotEqual(t, pid, srv.Workers()[0].Pid) + pid := *rr.Workers()[0].Pid + assert.NoError(t, rr.Reset()) + assert.Len(t, rr.Workers(), 1) + assert.NotEqual(t, pid, rr.Workers()[0].Pid) } func TestServer_ReplacePool(t *testing.T) { - srv := NewServer( + rr := NewServer( &ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -175,27 +194,27 @@ func TestServer_ReplacePool(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) + assert.NoError(t, rr.Start()) constructed := make(chan interface{}) - srv.Listen(func(e int, ctx interface{}) { + rr.Listen(func(e int, ctx interface{}) { if e == EventPoolConstruct { close(constructed) } }) - srv.Reset() + rr.Reset() <-constructed - for _, w := range srv.Workers() { + for _, w := range rr.Workers() { assert.Equal(t, StateReady, w.state.Value()) } } func TestServer_ServerFailure(t *testing.T) { - srv := NewServer(&ServerConfig{ + rr := NewServer(&ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", Pool: &Config{ @@ -204,25 +223,25 @@ func TestServer_ServerFailure(t *testing.T) { DestroyTimeout: time.Second, }, }) - defer srv.Stop() + defer rr.Stop() - assert.NoError(t, srv.Start()) + assert.NoError(t, rr.Start()) failure := make(chan interface{}) - srv.Listen(func(e int, ctx interface{}) { + rr.Listen(func(e int, ctx interface{}) { if e == EventServerFailure { failure <- nil } }) // emulating potential server failure - srv.cfg.Command = "php tests/client.php echo broken-connection" - srv.pool.(*StaticPool).cmd = func() *exec.Cmd { + rr.cfg.Command = "php tests/client.php echo broken-connection" + rr.pool.(*StaticPool).cmd = func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "broken-connection") } // killing random worker and expecting pool to replace it - srv.Workers()[0].cmd.Process.Kill() + rr.Workers()[0].cmd.Process.Kill() <-failure assert.True(t, true) diff --git a/service/container.go b/service/container.go index 275cfffd..abeaf369 100644 --- a/service/container.go +++ b/service/container.go @@ -16,13 +16,13 @@ var errNoConfig = fmt.Errorf("no config has been provided") // implement service.HydrateConfig. const InitMethod = "Init" -// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept // other services and/or configs as dependency. type Service interface { // Serve serves. Serve() error - // Stop stops the service. + // Detach stops the service. Stop() } @@ -198,7 +198,7 @@ func (c *container) Serve() error { return nil } -// Stop sends stop command to all running services. +// Detach sends stop command to all running services. func (c *container) Stop() { for _, e := range c.services { if e.hasStatus(StatusServing) { diff --git a/service/env/service.go b/service/env/service.go index 83175b36..00bc41ef 100644 --- a/service/env/service.go +++ b/service/env/service.go @@ -3,7 +3,7 @@ package env // ID contains default service name. const ID = "env" -// Service provides ability to map _ENV values from config file. +// Services provides ability to map _ENV values from config file. type Service struct { // values is default set of values. values map[string]string diff --git a/service/http/config.go b/service/http/config.go index 5a2c8768..899a5083 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -17,8 +17,8 @@ type Config struct { // SSL defines https server options. SSL SSLConfig - // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited. - MaxRequest int64 + // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. + MaxRequestSize int64 // Uploads configures uploads configuration. Uploads *UploadsConfig diff --git a/service/http/config_test.go b/service/http/config_test.go index 07901cb6..4cd2783f 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -31,8 +31,8 @@ func Test_Config_Hydrate_Error2(t *testing.T) { func Test_Config_Valid(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -58,7 +58,7 @@ func Test_Config_Valid_SSL(t *testing.T) { Cert: "fixtures/server.crt", Key: "fixtures/server.key", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -87,7 +87,7 @@ func Test_Config_SSL_No_key(t *testing.T) { SSL: SSLConfig{ Cert: "fixtures/server.crt", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -112,7 +112,7 @@ func Test_Config_SSL_No_Cert(t *testing.T) { SSL: SSLConfig{ Key: "fixtures/server.key", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -133,8 +133,8 @@ func Test_Config_SSL_No_Cert(t *testing.T) { func Test_Config_NoUploads(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Workers: &roadrunner.ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -151,8 +151,8 @@ func Test_Config_NoUploads(t *testing.T) { func Test_Config_NoWorkers(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -164,8 +164,8 @@ func Test_Config_NoWorkers(t *testing.T) { func Test_Config_NoPool(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -186,8 +186,8 @@ func Test_Config_NoPool(t *testing.T) { func Test_Config_DeadPool(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -203,8 +203,8 @@ func Test_Config_DeadPool(t *testing.T) { func Test_Config_InvalidAddress(t *testing.T) { cfg := &Config{ - Address: "", - MaxRequest: 1024, + Address: "", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, diff --git a/service/http/handler.go b/service/http/handler.go index 8cebc42a..a7a6d4d0 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -75,12 +75,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() // validating request size - if h.cfg.MaxRequest != 0 { + if h.cfg.MaxRequestSize != 0 { if length := r.Header.Get("content-length"); length != "" { if size, err := strconv.ParseInt(length, 10, 64); err != nil { h.handleError(w, r, err, start) return - } else if size > h.cfg.MaxRequest*1024*1024 { + } else if size > h.cfg.MaxRequestSize*1024*1024 { h.handleError(w, r, errors.New("request body max size is exceeded"), start) return } diff --git a/service/http/handler_test.go b/service/http/handler_test.go index d876ef8e..5d4f7659 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -32,7 +32,7 @@ func get(url string) (string, *http.Response, error) { func TestHandler_Echo(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -67,7 +67,7 @@ func TestHandler_Echo(t *testing.T) { func Test_HandlerErrors(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -94,7 +94,7 @@ func Test_HandlerErrors(t *testing.T) { func Test_Handler_JSON_error(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -123,7 +123,7 @@ func Test_Handler_JSON_error(t *testing.T) { func TestHandler_Headers(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -170,7 +170,7 @@ func TestHandler_Headers(t *testing.T) { func TestHandler_Empty_User_Agent(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -216,7 +216,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { func TestHandler_User_Agent(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -262,7 +262,7 @@ func TestHandler_User_Agent(t *testing.T) { func TestHandler_Cookies(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -313,7 +313,7 @@ func TestHandler_Cookies(t *testing.T) { func TestHandler_JsonPayload_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -363,7 +363,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { func TestHandler_JsonPayload_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -409,7 +409,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { func TestHandler_JsonPayload_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -455,7 +455,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { func TestHandler_FormData_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -513,7 +513,7 @@ func TestHandler_FormData_POST(t *testing.T) { func TestHandler_FormData_POST_Overwrite(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -565,7 +565,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -623,7 +623,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { func TestHandler_FormData_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -681,7 +681,7 @@ func TestHandler_FormData_PUT(t *testing.T) { func TestHandler_FormData_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -739,7 +739,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { func TestHandler_Multipart_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -801,7 +801,7 @@ func TestHandler_Multipart_POST(t *testing.T) { func TestHandler_Multipart_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -863,7 +863,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { func TestHandler_Multipart_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -925,7 +925,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { func TestHandler_Error(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -959,7 +959,7 @@ func TestHandler_Error(t *testing.T) { func TestHandler_Error2(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -993,7 +993,7 @@ func TestHandler_Error2(t *testing.T) { func TestHandler_Error3(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1, + MaxRequestSize: 1, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1038,7 +1038,7 @@ func TestHandler_Error3(t *testing.T) { func TestHandler_ResponseDuration(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1087,7 +1087,7 @@ func TestHandler_ResponseDuration(t *testing.T) { func TestHandler_ResponseDurationDelayed(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1136,7 +1136,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { func TestHandler_ErrorDuration(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1184,7 +1184,7 @@ func TestHandler_ErrorDuration(t *testing.T) { func BenchmarkHandler_Listen_Echo(b *testing.B) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, diff --git a/service/http/rpc.go b/service/http/rpc.go index 3390a93d..7b38dece 100644 --- a/service/http/rpc.go +++ b/service/http/rpc.go @@ -20,7 +20,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error { } *r = "OK" - return rpc.svc.rr.Reset() + return rpc.svc.Server().Reset() } // Workers returns list of active workers and their stats. @@ -29,6 +29,6 @@ func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { return errors.New("http server is not running") } - r.Workers, err = util.ServerState(rpc.svc.rr) + r.Workers, err = util.ServerState(rpc.svc.Server()) return err } diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go index ba3efd2e..669b201c 100644 --- a/service/http/rpc_test.go +++ b/service/http/rpc_test.go @@ -27,7 +27,7 @@ func Test_RPC(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -88,7 +88,7 @@ func Test_RPC_Unix(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -142,7 +142,7 @@ func Test_Workers(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/http/service.go b/service/http/service.go index ad59f887..651284b4 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -25,7 +25,7 @@ const ( // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc -// Service manages rr, http servers. +// Services manages rr, http servers. type Service struct { cfg *Config env env.Environment @@ -33,11 +33,17 @@ type Service struct { mdwr []middleware mu sync.Mutex rr *roadrunner.Server + watcher roadrunner.Watcher handler *Handler http *http.Server https *http.Server } +// Watch attaches watcher. +func (s *Service) Watch(w roadrunner.Watcher) { + s.watcher = w +} + // AddMiddleware adds new net/http mdwr. func (s *Service) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) @@ -53,6 +59,7 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) { s.cfg = cfg s.env = e + if r != nil { if err := r.Register(ID, &rpcServer{s}); err != nil { return false, err @@ -77,6 +84,10 @@ func (s *Service) Serve() error { s.rr = roadrunner.NewServer(s.cfg.Workers) s.rr.Listen(s.throw) + if s.watcher != nil { + s.rr.Watch(s.watcher) + } + s.handler = &Handler{cfg: s.cfg, rr: s.rr} s.handler.Listen(s.throw) @@ -102,7 +113,7 @@ func (s *Service) Serve() error { return <-err } -// Stop stops the svc. +// Detach stops the svc. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -117,6 +128,14 @@ func (s *Service) Stop() { go s.http.Shutdown(context.Background()) } +// Server returns associated roadrunner server (if any). +func (s *Service) Server() *roadrunner.Server { + s.mu.Lock() + defer s.mu.Unlock() + + return s.rr +} + // ServeHTTP handles connection using set of middleware and rr PSR-7 server. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { diff --git a/service/http/service_test.go b/service/http/service_test.go index d1d601dc..5b6d60d8 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -84,7 +84,7 @@ func Test_Service_Configure_Enable(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":8070", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -115,7 +115,7 @@ func Test_Service_Echo(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -168,7 +168,7 @@ func Test_Service_Env(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -220,7 +220,7 @@ func Test_Service_ErrorEcho(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -280,7 +280,7 @@ func Test_Service_Middleware(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -354,7 +354,7 @@ func Test_Service_Listener(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -398,7 +398,7 @@ func Test_Service_Error(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -427,7 +427,7 @@ func Test_Service_Error2(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -456,7 +456,7 @@ func Test_Service_Error3(t *testing.T) { assert.Error(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -483,7 +483,7 @@ func Test_Service_Error4(t *testing.T) { assert.Error(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": "----", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go index d452f834..0fbf0e14 100644 --- a/service/http/uploads_test.go +++ b/service/http/uploads_test.go @@ -20,7 +20,7 @@ import ( func TestHandler_Upload_File(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -81,7 +81,7 @@ func TestHandler_Upload_File(t *testing.T) { func TestHandler_Upload_NestedFile(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -142,7 +142,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { func TestHandler_Upload_File_NoTmpDir(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: "-----", Forbid: []string{}, @@ -203,7 +203,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { func TestHandler_Upload_File_Forbids(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, diff --git a/service/rpc/service.go b/service/rpc/service.go index 0b957976..ea262615 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -3,6 +3,7 @@ package rpc import ( "errors" "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/env" "net/rpc" "sync" @@ -11,7 +12,7 @@ import ( // ID contains default service name. const ID = "rpc" -// Service is RPC service. +// Services is RPC service. type Service struct { cfg *Config stop chan interface{} @@ -21,7 +22,7 @@ type Service struct { } // Init rpc service. Must return true if service is enabled. -func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) { +func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) { if !cfg.Enable { return false, nil } @@ -33,6 +34,10 @@ func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) { env.SetEnv("RR_RPC", cfg.Listen) } + if err := s.Register("system", &systemService{c}); err != nil { + return false, err + } + return true, nil } @@ -78,7 +83,7 @@ func (s *Service) Serve() error { return nil } -// Stop stops the service. +// Detach stops the service. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go index 0278d287..ee87509a 100644 --- a/service/rpc/service_test.go +++ b/service/rpc/service_test.go @@ -1,6 +1,7 @@ package rpc import ( + "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/env" "github.com/stretchr/testify/assert" "testing" @@ -13,7 +14,7 @@ func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil func Test_Disabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: false}, nil) + ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.False(t, ok) @@ -31,7 +32,7 @@ func Test_RegisterNotConfigured(t *testing.T) { func Test_Enabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -39,7 +40,7 @@ func Test_Enabled(t *testing.T) { func Test_StopNonServing(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -48,7 +49,7 @@ func Test_StopNonServing(t *testing.T) { func Test_Serve_Errors(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -61,7 +62,7 @@ func Test_Serve_Errors(t *testing.T) { func Test_Serve_Client(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -85,7 +86,7 @@ func Test_Serve_Client(t *testing.T) { func TestSetEnv(t *testing.T) { s := &Service{} e := env.NewService(map[string]string{}) - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, e) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e) assert.NoError(t, err) assert.True(t, ok) diff --git a/service/rpc/system.go b/service/rpc/system.go new file mode 100644 index 00000000..d1368a05 --- /dev/null +++ b/service/rpc/system.go @@ -0,0 +1,18 @@ +package rpc + +import "github.com/spiral/roadrunner/service" + +// systemService service controls roadrunner server. +type systemService struct { + c service.Container +} + +// Detach the underlying c. +func (s *systemService) Stop(stop bool, r *string) error { + if stop { + s.c.Stop() + } + *r = "OK" + + return nil +} diff --git a/service/static/service.go b/service/static/service.go index b824e787..679033f2 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -9,7 +9,7 @@ import ( // ID contains default service name. const ID = "static" -// Service serves static files. Potentially convert into middleware? +// Services serves static files. Potentially convert into middleware? type Service struct { // server configuration (location, forbidden files and etc) cfg *Config diff --git a/service/static/service_test.go b/service/static/service_test.go index af616418..d69b2fdd 100644 --- a/service/static/service_test.go +++ b/service/static/service_test.go @@ -60,7 +60,7 @@ func Test_Files(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -113,7 +113,7 @@ func Test_Files_Disable(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -150,7 +150,7 @@ func Test_Files_Error(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -180,7 +180,7 @@ func Test_Files_Error2(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -210,7 +210,7 @@ func Test_Files_Forbid(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -247,7 +247,7 @@ func Test_Files_Always(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -284,7 +284,7 @@ func Test_Files_NotFound(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -321,7 +321,7 @@ func Test_Files_Dir(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -358,7 +358,7 @@ func Test_Files_NotForbid(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/watcher/config.go b/service/watcher/config.go new file mode 100644 index 00000000..74be517a --- /dev/null +++ b/service/watcher/config.go @@ -0,0 +1,48 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "time" +) + +// Configures set of Services. +type Config struct { + // Interval defines the update duration for underlying watchers, default 1s. + Interval time.Duration + + // Services declares list of services to be watched. + Services map[string]*watcherConfig +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + // Always use second based definition for time durations + if c.Interval < time.Microsecond { + c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds()) + } + + return nil +} + +// InitDefaults sets missing values to their default values. +func (c *Config) InitDefaults() error { + c.Interval = time.Second + + return nil +} + +// Watchers returns list of defined Services +func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { + watchers = make(map[string]roadrunner.Watcher) + + for name, cfg := range c.Services { + watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg} + } + + return watchers +} diff --git a/service/watcher/service.go b/service/watcher/service.go new file mode 100644 index 00000000..c81ff3f5 --- /dev/null +++ b/service/watcher/service.go @@ -0,0 +1,46 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" +) + +// ID defines watcher service name. +const ID = "watch" + +// Watchable defines the ability to attach roadrunner watcher. +type Watchable interface { + // Watch attaches watcher to the service. + Watch(w roadrunner.Watcher) +} + +// Services to watch the state of roadrunner service inside other services. +type Service struct { + cfg *Config + lsns []func(event int, ctx interface{}) +} + +// Init watcher service +func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { + // mount Services to designated services + for id, watcher := range cfg.Watchers(s.throw) { + svc, _ := c.Get(id) + if watchable, ok := svc.(Watchable); ok { + watchable.Watch(watcher) + } + } + + return true, nil +} + +// AddListener attaches server event watcher. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.lsns = append(s.lsns, l) +} + +// throw handles service, server and pool events. +func (s *Service) throw(event int, ctx interface{}) { + for _, l := range s.lsns { + l(event, ctx) + } +} diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go new file mode 100644 index 00000000..3090d15d --- /dev/null +++ b/service/watcher/state_watch.go @@ -0,0 +1,58 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "time" +) + +type stateWatcher struct { + prev map[*roadrunner.Worker]state + next map[*roadrunner.Worker]state +} + +type state struct { + state int64 + numExecs int64 + since time.Time +} + +func newStateWatcher() *stateWatcher { + return &stateWatcher{ + prev: make(map[*roadrunner.Worker]state), + next: make(map[*roadrunner.Worker]state), + } +} + +// add new worker to be watched +func (sw *stateWatcher) push(w *roadrunner.Worker) { + sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} +} + +// update worker states. +func (sw *stateWatcher) sync(t time.Time) { + for w := range sw.prev { + if _, ok := sw.next[w]; !ok { + delete(sw.prev, w) + } + } + + for w, s := range sw.next { + ps, ok := sw.prev[w] + if !ok || ps.state != s.state || ps.numExecs != s.numExecs { + sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} + } + + delete(sw.next, w) + } +} + +// find all workers which spend given amount of time in a specific state. +func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) { + for w, s := range sw.prev { + if s.state == state && s.since.Before(since) { + workers = append(workers, w) + } + } + + return +} diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go new file mode 100644 index 00000000..08d477fa --- /dev/null +++ b/service/watcher/watcher.go @@ -0,0 +1,153 @@ +package watcher + +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" + "time" +) + +const ( + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory = iota + 8000 + + // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError + EventMaxTTL + + // EventMaxIdleTTL triggered when worker spends too much time at rest. + EventMaxIdleTTL + + // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). + EventMaxExecTTL +) + +// handles watcher events +type listener func(event int, ctx interface{}) + +// defines the watcher behaviour +type watcherConfig struct { + // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. + MaxMemory uint64 + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // MaxIdleTTL defines maximum duration worker can spend in idle mode. + MaxIdleTTL int64 + + // MaxExecTTL defines maximum lifetime per job. + MaxExecTTL int64 +} + +type watcher struct { + lsn listener + tick time.Duration + cfg *watcherConfig + + // list of workers which are currently working + sw *stateWatcher + + stop chan interface{} +} + +// watch the pool state +func (wch *watcher) watch(p roadrunner.Pool) { + now := time.Now() + + for _, w := range p.Workers() { + if w.State().Value() == roadrunner.StateInvalid { + // skip duplicate assessment + continue + } + + s, err := util.WorkerState(w) + if err != nil { + continue + } + + if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { + err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) + if p.Remove(w, err) { + wch.report(EventMaxTTL, w, err) + } + continue + } + + if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) + if p.Remove(w, err) { + wch.report(EventMaxMemory, w, err) + } + continue + } + + // watch the worker state changes + wch.sw.push(w) + } + + wch.sw.sync(now) + + if wch.cfg.MaxExecTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateWorking, + now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), + ) { + err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) + if p.Remove(w, err) { + // brutally + go w.Kill() + wch.report(EventMaxExecTTL, w, err) + } + } + } + + // locale workers which are in idle mode for too long + if wch.cfg.MaxIdleTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateReady, + now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), + ) { + err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) + if p.Remove(w, err) { + wch.report(EventMaxIdleTTL, w, err) + } + } + } +} + +// throw watcher event +func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { + if wch.lsn != nil { + wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) + } +} + +// Attach watcher to the pool +func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { + wp := &watcher{ + tick: wch.tick, + lsn: wch.lsn, + cfg: wch.cfg, + sw: newStateWatcher(), + stop: make(chan interface{}), + } + + go func(wp *watcher, pool roadrunner.Pool) { + ticker := time.NewTicker(wp.tick) + for { + select { + case <-ticker.C: + wp.watch(pool) + case <-wp.stop: + return + } + } + }(wp, pool) + + return wp +} + +// Detach watcher from the pool. +func (wch *watcher) Detach() { + close(wch.stop) +} diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php index 4d427121..cb534577 100644 --- a/src/Diactoros/ServerRequestFactory.php +++ b/src/Diactoros/ServerRequestFactory.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner\Diactoros; diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php index 6004ef11..1c51d911 100644 --- a/src/Diactoros/StreamFactory.php +++ b/src/Diactoros/StreamFactory.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner\Diactoros; diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php index 1543a826..7de9a30f 100644 --- a/src/Diactoros/UploadedFileFactory.php +++ b/src/Diactoros/UploadedFileFactory.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner\Diactoros; diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php index 7c5c5929..348cb106 100644 --- a/src/Exception/RoadRunnerException.php +++ b/src/Exception/RoadRunnerException.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner\Exception; diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php index 1a5da18c..803e4e31 100644 --- a/src/Exceptions/RoadRunnerException.php +++ b/src/Exceptions/RoadRunnerException.php @@ -5,6 +5,7 @@ * @license MIT * @author Anton Titov (Wolfy-J) */ +declare(strict_types=1); namespace Spiral\RoadRunner\Exceptions; diff --git a/src/HttpClient.php b/src/HttpClient.php index e469dd30..f31a9b50 100644 --- a/src/HttpClient.php +++ b/src/HttpClient.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Alex Bond */ +declare(strict_types=1); namespace Spiral\RoadRunner; diff --git a/src/PSR7Client.php b/src/PSR7Client.php index 8229b7d5..5b9425d6 100644 --- a/src/PSR7Client.php +++ b/src/PSR7Client.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner; diff --git a/src/Worker.php b/src/Worker.php index da80e461..b67ebd3b 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -1,11 +1,10 @@ <?php -declare(strict_types=1); - /** * High-performance PHP process supervisor and load balancer written in Go * * @author Wolfy-J */ +declare(strict_types=1); namespace Spiral\RoadRunner; diff --git a/src/bin/roadrunner b/src/bin/roadrunner new file mode 100755 index 00000000..cf4317d7 --- /dev/null +++ b/src/bin/roadrunner @@ -0,0 +1,207 @@ +#!/usr/bin/env php +<?php +/** + * RoadRunner + * High-performance PHP process supervisor and load balancer written in Go + * + * This file responsive for cli commands + */ +declare(strict_types=1); + +foreach ([ + __DIR__ . '/../../../../autoload.php', + __DIR__ . '/../../vendor/autoload.php', + __DIR__ . '/vendor/autoload.php' + ] as $file) { + if (file_exists($file)) { + define('RR_COMPOSER_INSTALL', $file); + + break; + } +} + +unset($file); + +if (!defined('RR_COMPOSER_INSTALL')) { + fwrite( + STDERR, + 'You need to set up the project dependencies using Composer:' . PHP_EOL . PHP_EOL . + ' composer install' . PHP_EOL . PHP_EOL . + 'You can learn all about Composer on https://getcomposer.org/.' . PHP_EOL + ); + + die(1); +} + +if (!class_exists('ZipArchive')) { + fwrite(STDERR, 'Extension `php-zip` is required.' . PHP_EOL); + die(1); +} + +if (!function_exists('curl_init')) { + fwrite(STDERR, 'Extension `php-curl` is required.' . PHP_EOL); + die(1); +} + +require RR_COMPOSER_INSTALL; + +use Symfony\Component\Console\Application; +use Symfony\Component\Console\Helper\ProgressBar; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ConfirmationQuestion; + +class RoadRunnerCLIHelper +{ + /** + * Returns version of RoadRunner based on build.sh file + * + * @return string Version of RoadRunner + * @throws Exception + */ + public static function getVersion(): string + { + $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh'; + $fileResource = fopen($file, 'r') or die(1); + while (!feof($fileResource)) { + $line = fgets($fileResource, 4096); + $matches = []; + if (preg_match("/^RR_VERSION=(.*)/", $line, $matches)) { + return $matches[1]; + } + } + fclose($fileResource); + throw new Exception("Can't find version of RoadRunner"); + } + + /** + * Returns OS Type for filename + * + * @return string OS Type + */ + public static function getOSType(): string + { + switch (PHP_OS) { + case 'Darwin': + return 'darwin'; + case 'Linux': + return 'linux'; + case 'FreeBSD': + return 'freebsd'; + case 'WIN32': + case 'WINNT': + case 'Windows': + return 'windows'; + default: + return 'linux'; + } + } + + /** + * Returns generated URL to zip file on GitHub with binary file + * + * @return string URL + * @throws Exception + */ + public static function getBinaryDownloadUrl() + { + return 'https://github.com/spiral/roadrunner/releases/download/v' . static::getVersion() . '/roadrunner-' . static::getVersion() . '-' . static::getOSType() . '-amd64.zip'; + } +} + +(new Application('RoadRunner', RoadRunnerCLIHelper::getVersion())) + ->register('update-binaries') + ->setDescription("Install or update RoadRunner binaries in specified folder (current folder by default)") + ->addOption('location', 'l', InputArgument::OPTIONAL, 'destination folder', '.') + ->setCode(function (InputInterface $input, OutputInterface $output) { + $output->writeln('<info>Updating binary file of RoadRunner</info>'); + + $finalFile = $input->getOption('location') . DIRECTORY_SEPARATOR . 'rr'; + + if (is_file($finalFile)) { + $output->writeln('<error>RoadRunner binary file already exists!</error>'); + $helper = $this->getHelper('question'); + $question = new ConfirmationQuestion('Do you want overwrite it? [Y/n]'); + + if (!$helper->ask($input, $output, $question)) { + return; + } + } + + $output->writeln('<info>Downloading RoadRunner archive for ' . RoadRunnerCLIHelper::getOSType() . '</info>'); + + $progressBar = new ProgressBar($output); + $progressBar->setFormat('verbose'); + $progressBar->start(); + + $zipFileName = tempnam('.', "rr_zip"); + $zipFile = fopen($zipFileName, "w+"); + $curlResource = curl_init(); + curl_setopt($curlResource, CURLOPT_URL, RoadRunnerCLIHelper::getBinaryDownloadUrl()); + curl_setopt($curlResource, CURLOPT_RETURNTRANSFER, true); + curl_setopt($curlResource, CURLOPT_BINARYTRANSFER, true); + curl_setopt($curlResource, CURLOPT_FOLLOWLOCATION, true); + curl_setopt($curlResource, CURLOPT_FILE, $zipFile); + curl_setopt($curlResource, CURLOPT_PROGRESSFUNCTION, + function ($resource, $download_size, $downloaded, $upload_size, $uploaded) use ($progressBar) { + if ($download_size == 0) { + return; + } + $progressBar->setFormat('[%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% ' . intval($download_size / 1024) . 'KB'); + $progressBar->setMaxSteps($download_size); + $progressBar->setProgress($downloaded); + }); + curl_setopt($curlResource, CURLOPT_NOPROGRESS, false); // needed to make progress function work + curl_setopt($curlResource, CURLOPT_HEADER, 0); + curl_exec($curlResource); + curl_close($curlResource); + fclose($zipFile); + + $progressBar->finish(); + $output->writeln(""); + + $output->writeln('<info>Unpacking ' . basename(RoadRunnerCLIHelper::getBinaryDownloadUrl()) . '</info>'); + + $zipArchive = new ZipArchive(); + $zipArchive->open($zipFileName); + $fileStreamFromZip = $zipArchive->getStream('roadrunner-' . RoadRunnerCLIHelper::getVersion() . '-' . RoadRunnerCLIHelper::getOSType() . '-amd64/rr'); + $finalFileResource = fopen($finalFile, 'w'); + + if (!$fileStreamFromZip) { + throw new Exception('Unable to extract the file.'); + } + + while (!feof($fileStreamFromZip)) { + fwrite($finalFileResource, fread($fileStreamFromZip, 8192)); + } + + fclose($fileStreamFromZip); + fclose($finalFileResource); + $zipArchive->extractTo('.', []); + $zipArchive->close(); + unlink($zipFileName); + + chmod($finalFile, 0755); + $output->writeln('<info>Binary file updated!</info>'); + }) + ->getApplication() + ->register("init-config") + ->setDescription("Inits default .rr.yaml config in specified folder (current folder by default)") + ->addOption('location', 'l', InputArgument::OPTIONAL, 'destination folder', '.') + ->setCode(function (InputInterface $input, OutputInterface $output) { + if (is_file($input->getOption('location') . DIRECTORY_SEPARATOR . '.rr.yaml')) { + $output->writeln('<error>Config file already exists!</error>'); + $helper = $this->getHelper('question'); + $question = new ConfirmationQuestion('Do you want overwrite it? [Y/n]'); + + if (!$helper->ask($input, $output, $question)) { + return; + } + } + copy(__DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '.rr.yaml', + $input->getOption('location') . DIRECTORY_SEPARATOR . '.rr.yaml'); + $output->writeln('<info>Config file created!</info>'); + }) + ->getApplication() + ->run(); @@ -26,8 +26,8 @@ const ( // StateWorking - working on given payload. StateWorking - // StateStreaming - indicates that worker is streaming the data at the moment. - StateStreaming + // StateInvalid - indicates that worker is being disabled and will be removed. + StateInvalid // StateStopping - process is being softly stopped. StateStopping @@ -57,8 +57,8 @@ func (s *state) String() string { return "ready" case StateWorking: return "working" - case StateStreaming: - return "streaming" + case StateInvalid: + return "invalid" case StateStopped: return "stopped" case StateErrored: diff --git a/static_pool.go b/static_pool.go index c9473699..02960825 100644 --- a/static_pool.go +++ b/static_pool.go @@ -41,6 +41,10 @@ type StaticPool struct { // all registered workers workers []*Worker + // invalid declares set of workers to be removed from the pool. + mur sync.Mutex + remove sync.Map + // pool is being destroyed inDestroy int32 destroy chan interface{} @@ -111,6 +115,21 @@ func (p *StaticPool) Workers() (workers []*Worker) { return workers } +// Remove forces pool to remove specific worker. +func (p *StaticPool) Remove(w *Worker, err error) bool { + if w.State().Value() != StateReady && w.State().Value() != StateWorking { + // unable to remove inactive worker + return false + } + + if _, ok := p.remove.Load(w); ok { + return false + } + + p.remove.Store(w, err) + return true +} + // Exec one task with given payload and context, returns result or error. func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { p.tmu.Lock() @@ -133,13 +152,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return nil, err } - go p.destroyWorker(w, err) + p.discardWorker(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.destroyWorker(w, err) + p.discardWorker(w, err) return p.Exec(rqs) } @@ -159,6 +178,7 @@ func (p *StaticPool) Destroy() { var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) + w.markInvalid() go func(w *Worker) { defer wg.Done() p.destroyWorker(w, nil) @@ -181,6 +201,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) + + // get next worker + i++ + continue + } + return w, nil case <-p.destroy: return nil, fmt.Errorf("pool has been stopped") @@ -199,8 +227,19 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { atomic.AddInt64(&p.numDead, ^int64(0)) continue } + + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) + + // get next worker + i++ + continue + } + return w, nil case <-p.destroy: + timeout.Stop() + return nil, fmt.Errorf("pool has been stopped") } } @@ -211,7 +250,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { // release releases or replaces the worker. func (p *StaticPool) release(w *Worker) { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - go p.destroyWorker(w, p.cfg.MaxJobs) + p.discardWorker(w, p.cfg.MaxJobs) + return + } + + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) return } @@ -242,6 +286,12 @@ func (p *StaticPool) createWorker() (*Worker, error) { return w, nil } +// gentry remove worker +func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { + w.markInvalid() + go p.destroyWorker(w, caused) +} + // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { go w.Stop() @@ -271,6 +321,7 @@ func (p *StaticPool) watchWorker(w *Worker) { for i, wc := range p.workers { if wc == w { p.workers = append(p.workers[:i], p.workers[i+1:]...) + p.remove.Delete(w) break } } @@ -307,9 +358,8 @@ func (p *StaticPool) destroyed() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { p.mul.Lock() - defer p.mul.Unlock() - if p.lsn != nil { p.lsn(event, ctx) } + p.mul.Unlock() } diff --git a/static_pool_test.go b/static_pool_test.go index a78fcf11..a7e71fdb 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -381,10 +381,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { DestroyTimeout: time.Second, }, ) - p.Destroy() - assert.NotNil(t, p) assert.NoError(t, err) + assert.NotNil(t, p) + + p.Destroy() } func Benchmark_Pool_Allocate(b *testing.B) { @@ -425,7 +426,11 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, _ := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - cfg, + Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, ) defer p.Destroy() diff --git a/tests/pid.php b/tests/pid.php index a8cfa229..bc1928a6 100644 --- a/tests/pid.php +++ b/tests/pid.php @@ -1,17 +1,17 @@ <?php -/** - * @var Goridge\RelayInterface $relay - */ + /** + * @var Goridge\RelayInterface $relay + */ -use Spiral\Goridge; -use Spiral\RoadRunner; + use Spiral\Goridge; + use Spiral\RoadRunner; -$rr = new RoadRunner\Worker($relay); + $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { - try { - $rr->send((string)getmypid()); - } catch (\Throwable $e) { - $rr->error((string)$e); - } -}
\ No newline at end of file + while ($in = $rr->receive($ctx)) { + try { + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } + }
\ No newline at end of file diff --git a/tests/slow-pid.php b/tests/slow-pid.php new file mode 100644 index 00000000..daaf2583 --- /dev/null +++ b/tests/slow-pid.php @@ -0,0 +1,18 @@ +<?php + /** + * @var Goridge\RelayInterface $relay + */ + + use Spiral\Goridge; + use Spiral\RoadRunner; + + $rr = new RoadRunner\Worker($relay); + + while ($in = $rr->receive($ctx)) { + try { + sleep(1); + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } + }
\ No newline at end of file diff --git a/util/state_test.go b/util/state_test.go index 2afe682e..d52bee1d 100644 --- a/util/state_test.go +++ b/util/state_test.go @@ -30,6 +30,16 @@ func TestServerState(t *testing.T) { assert.Len(t, state, runtime.NumCPU()) } +func TestDeadWorker(t *testing.T) { + w := &roadrunner.Worker{} + i := 0 + + w.Pid = &i + + _, err := WorkerState(w) + assert.Error(t, err) +} + func TestServerState_Err(t *testing.T) { _, err := ServerState(nil) assert.Error(t, err) diff --git a/watcher.go b/watcher.go new file mode 100644 index 00000000..4527fe68 --- /dev/null +++ b/watcher.go @@ -0,0 +1,10 @@ +package roadrunner + +// Watcher observes pool state and decides if any worker must be destroyed. +type Watcher interface { + // Lock watcher on given pool instance. + Attach(p Pool) Watcher + + // Detach pool watching. + Detach() +} diff --git a/watcher_test.go b/watcher_test.go new file mode 100644 index 00000000..94ac591e --- /dev/null +++ b/watcher_test.go @@ -0,0 +1,216 @@ +package roadrunner + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "runtime" + "testing" + "time" +) + +type eWatcher struct { + p Pool + onAttach func(p Pool) + onDetach func(p Pool) +} + +func (w *eWatcher) Attach(p Pool) Watcher { + wp := &eWatcher{p: p, onAttach: w.onAttach, onDetach: w.onDetach} + + if wp.onAttach != nil { + wp.onAttach(p) + } + + return wp +} + +func (w *eWatcher) Detach() { + if w.onDetach != nil { + w.onDetach(w.p) + } +} + +func (w *eWatcher) remove(wr *Worker, err error) { + w.p.Remove(wr, err) +} + +func Test_WatcherWatch(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + rr.Watch(&eWatcher{}) + assert.NoError(t, rr.Start()) + + assert.NotNil(t, rr.pWatcher) + assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_WatcherReattach(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + rr.Watch(&eWatcher{}) + assert.NoError(t, rr.Start()) + + assert.NotNil(t, rr.pWatcher) + assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + + oldWatcher := rr.pWatcher + + assert.NoError(t, rr.Reset()) + + assert.NotNil(t, rr.pWatcher) + assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + assert.NotEqual(t, oldWatcher, rr.pWatcher) + + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_WatcherAttachDetachSequence(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + var attachedPool Pool + + rr.Watch(&eWatcher{ + onAttach: func(p Pool) { + attachedPool = p + }, + onDetach: func(p Pool) { + assert.Equal(t, attachedPool, p) + }, + }) + assert.NoError(t, rr.Start()) + + assert.NotNil(t, rr.pWatcher) + assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_RemoveWorkerOnAllocation(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php pid pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + rr.Watch(&eWatcher{}) + assert.NoError(t, rr.Start()) + + wr := rr.Workers()[0] + + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String()) + lastPid := res.String() + + rr.pWatcher.(*eWatcher).remove(wr, nil) + + res, err = rr.Exec(&Payload{Body: []byte("hello")}) + assert.NoError(t, err) + assert.NotEqual(t, lastPid, res.String()) + + assert.NotEqual(t, StateReady, wr.state.Value()) + + _, ok := rr.pool.(*StaticPool).remove.Load(wr) + assert.False(t, ok) +} + +func Test_RemoveWorkerAfterTask(t *testing.T) { + rr := NewServer( + &ServerConfig{ + Command: "php tests/client.php slow-pid pipes", + Relay: "pipes", + Pool: &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }) + defer rr.Stop() + + rr.Watch(&eWatcher{}) + assert.NoError(t, rr.Start()) + + wr := rr.Workers()[0] + lastPid := "" + + wait := make(chan interface{}) + go func() { + res, err := rr.Exec(&Payload{Body: []byte("hello")}) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String()) + lastPid = res.String() + + close(wait) + }() + + // wait for worker execution to be in progress + time.Sleep(time.Millisecond * 250) + rr.pWatcher.(*eWatcher).remove(wr, nil) + + <-wait + + // must be replaced + assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0])) + + // must not be registered withing pool + rr.pWatcher.(*eWatcher).remove(wr, nil) +} @@ -127,7 +127,7 @@ func (w *Worker) Wait() error { return &exec.ExitError{ProcessState: w.endState} } -// Stop sends soft termination command to the worker and waits for process completion. +// Detach sends soft termination command to the worker and waits for process completion. func (w *Worker) Stop() error { select { case <-w.waitDone: @@ -137,7 +137,7 @@ func (w *Worker) Stop() error { defer w.mu.Unlock() w.state.set(StateStopping) - err := sendPayload(w.rl, &stopCommand{Stop: true}) + err := sendControl(w.rl, &stopCommand{Stop: true}) <-w.waitDone return err @@ -164,34 +164,39 @@ func (w *Worker) Kill() error { // errors. Method might return JobError indicating issue with payload. func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { w.mu.Lock() - defer w.mu.Unlock() if rqs == nil { + w.mu.Unlock() return nil, fmt.Errorf("payload can not be empty") } if w.state.Value() != StateReady { + w.mu.Unlock() return nil, fmt.Errorf("worker is not ready (%s)", w.state.String()) } w.state.set(StateWorking) - defer w.state.registerExec() rsp, err = w.execPayload(rqs) if err != nil { if _, ok := err.(JobError); !ok { w.state.set(StateErrored) + w.state.registerExec() + w.mu.Unlock() return nil, err } } - // todo: attach when payload is complete - // todo: new status - w.state.set(StateReady) + w.state.registerExec() + w.mu.Unlock() return rsp, err } +func (w *Worker) markInvalid() { + w.state.set(StateInvalid) +} + func (w *Worker) start() error { if err := w.cmd.Start(); err != nil { close(w.waitDone) @@ -220,7 +225,8 @@ func (w *Worker) start() error { } func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { - if err := sendPayload(w.rl, rqs.Context); err != nil { + // two things + if err := sendControl(w.rl, rqs.Context); err != nil { return nil, errors.Wrap(err, "header error") } |