summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--CHANGELOG.md9
-rw-r--r--Makefile5
-rw-r--r--cmd/rr/.rr.yaml56
-rw-r--r--composer.json3
-rw-r--r--config.go4
-rw-r--r--php-src/PSR7Client.php42
-rw-r--r--php-src/tests/http/client.php45
-rw-r--r--php-src/tests/http/echo.php10
-rw-r--r--php-src/tests/http/header.php9
-rw-r--r--server.go4
-rw-r--r--server_config.go2
-rw-r--r--server_test.go20
-rw-r--r--service/http/config.go2
-rw-r--r--service/http/response.go1
-rw-r--r--service/http/response_test.go70
-rw-r--r--service/http/server.go2
-rw-r--r--service/http/server_test.go94
18 files changed, 282 insertions, 98 deletions
diff --git a/.travis.yml b/.travis.yml
index 93b32d1e..aa0281f0 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,11 +27,9 @@ script:
- go test ./service -race -v -coverprofile=service.txt -covermode=atomic
- go test ./service/rpc -race -v -coverprofile=rpc.txt -covermode=atomic
- go test ./service/http -race -v -coverprofile=http.txt -covermode=atomic
- - go test ./service/static -race -v -coverprofile=static.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash) -f lib.txt
- bash <(curl -s https://codecov.io/bash) -f service.txt
- bash <(curl -s https://codecov.io/bash) -f rpc.txt
- bash <(curl -s https://codecov.io/bash) -f http.txt
- - bash <(curl -s https://codecov.io/bash) -f static.txt \ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e69de29b..e5e66db7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -0,0 +1,9 @@
+CHANGELOG
+=========
+
+v1.0.0 (???? date)
+------
+- worker.State.Updated() has been removed in order to improve overall performance
+- staticPool can automatically replace workers killed from outside
+- server would not attempt to rebuild static pool in case of reoccurring failure
+- MORE STUFF \ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 00000000..732e55d3
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,5 @@
+test:
+ go test -v -race -cover
+ go test -v -race -cover ./service
+ go test -v -race -cover ./service/rpc
+ go test -v -race -cover ./service/http \ No newline at end of file
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
deleted file mode 100644
index 752573f6..00000000
--- a/cmd/rr/.rr.yaml
+++ /dev/null
@@ -1,56 +0,0 @@
-# rpc bus allows php application and external clients to talk to rr services.
-rpc:
- # enable rpc server
- enable: true
-
- # rpc connection DSN. Supported TCP and Unix sockets.
- listen: tcp://127.0.0.1:6001
-
-# http service configuration.
-http:
- # set to false to disable http server.
- enable: true
-
- # http host to listen.
- address: :8081
-
- # max POST request size, including file uploads. (default: 1GB)
- maxRequest: 1073741824
-
- # file upload configuration.
- uploads:
- # list of file extensions which are forbidden for uploading.
- forbid: [".php", ".exe", ".bat"]
-
- # http worker pool configuration.
- workers:
- # php worker command.
- command: "php /Users/wolfy-j/Projects/phpapp/webroot/index.php rr pipes --no-ansi"
-
- # connection method (pipes, tcp://:9000, unix://socket.unix).
- relay: "pipes"
-
- # worker pool configuration.
- pool:
- # number of workers to be serving.
- numWorkers: 4
-
- # maximum jobs per worker, 0 - unlimited.
- maxJobs: 0
-
- # for how long worker is allowed to be bootstrapped.
- allocateTimeout: 60000000
-
- # amount of time given to worker to gracefully destruct itself.
- destroyTimeout: 600000000
-
-# static file serving.
-static:
- # serve http static files
- enable: true
-
- # root directory for static file (http would not serve .php and .htaccess files).
- dir: "/Users/wolfy-j/Projects/phpapp/webroot"
-
- # list of extensions for forbid for serving.
- forbid: [".php", ".htaccess"] \ No newline at end of file
diff --git a/composer.json b/composer.json
index 403316fd..78da6f56 100644
--- a/composer.json
+++ b/composer.json
@@ -12,7 +12,8 @@
"require": {
"php": "^7.0",
"spiral/goridge": "^2.0",
- "psr/http-message": "^1.0"
+ "psr/http-message": "^1.0",
+ "zendframework/zend-diactoros": "^1.7"
},
"require-dev": {
"phpunit/phpunit": "~6.0"
diff --git a/config.go b/config.go
index 0ffe29a2..02008181 100644
--- a/config.go
+++ b/config.go
@@ -18,11 +18,11 @@ type Config struct {
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task.
- AllocateTimeout time.Duration
+ AllocateTimeout time.Duration //todo: to milleseconds?
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly stop, if timeout reached worker will be killed.
- DestroyTimeout time.Duration
+ DestroyTimeout time.Duration //todo: to milleseconds?
}
// Reconfigure returns error if cfg not valid
diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php
index 0f618d9e..c3cdf844 100644
--- a/php-src/PSR7Client.php
+++ b/php-src/PSR7Client.php
@@ -9,17 +9,10 @@ namespace Spiral\RoadRunner;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
-use Spiral\RoadRunner\Worker;
-use Zend\Diactoros\ServerRequest;
-use Zend\Diactoros\Stream;
-use Zend\Diactoros\UploadedFile;
+use Zend\Diactoros;
/**
- * Spiral Framework, SpiralScout LLC.
- *
- * @package spiralFramework
- * @author Anton Titov (Wolfy-J)
- * @copyright ©2009-2011
+ * Manages PSR-7 request and response.
*/
class PSR7Client
{
@@ -67,11 +60,11 @@ class PSR7Client
if ($ctx['parsed']) {
$parsedBody = json_decode($body, true);
} elseif ($body != null) {
- $bodyStream = new Stream("php://memory", "rwb");
+ $bodyStream = new Diactoros\Stream("php://memory", "rwb");
$bodyStream->write($body);
}
- return new ServerRequest(
+ return new Diactoros\ServerRequest(
$_SERVER,
$this->wrapUploads($ctx['uploads']),
$ctx['uri'],
@@ -92,9 +85,15 @@ class PSR7Client
*/
public function respond(ResponseInterface $response)
{
+ $headers = $response->getHeaders();
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
$this->worker->send($response->getBody(), json_encode([
'status' => $response->getStatusCode(),
- 'headers' => $response->getHeaders()
+ 'headers' => $headers
]));
}
@@ -113,18 +112,19 @@ class PSR7Client
$result = [];
foreach ($files as $index => $file) {
- if (isset($file['name'])) {
- $result[$index] = new UploadedFile(
- $file['tmpName'],
- $file['size'],
- $file['error'],
- $file['name'],
- $file['type']
- );
+ if (!isset($file['name'])) {
+ $result[$index] = $this->wrapUploads($file);
continue;
}
- $result[$index] = $this->wrapUploads($file);
+
+ $result[$index] = new Diactoros\UploadedFile(
+ $file['tmpName'],
+ $file['size'],
+ $file['error'],
+ $file['name'],
+ $file['type']
+ );
}
return $result;
diff --git a/php-src/tests/http/client.php b/php-src/tests/http/client.php
new file mode 100644
index 00000000..3b6b5dc6
--- /dev/null
+++ b/php-src/tests/http/client.php
@@ -0,0 +1,45 @@
+<?php
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/../../vendor/autoload.php";
+
+if (count($argv) < 3) {
+ die("need 2 arguments");
+}
+
+list($test, $goridge) = [$argv[1], $argv[2]];
+
+switch ($goridge) {
+ case "pipes":
+ $relay = new Goridge\StreamRelay(STDIN, STDOUT);
+ break;
+
+ case "tcp":
+ $relay = new Goridge\SocketRelay("localhost", 9007);
+ break;
+
+ case "unix":
+ $relay = new Goridge\SocketRelay(
+ "sock.unix",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+ break;
+
+ default:
+ die("invalid protocol selection");
+}
+
+$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay));
+require_once sprintf("%s/%s.php", __DIR__, $test);
+
+while ($req = $psr7->acceptRequest()) {
+ try {
+ $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response()));
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/php-src/tests/http/echo.php b/php-src/tests/http/echo.php
new file mode 100644
index 00000000..eeccc7e5
--- /dev/null
+++ b/php-src/tests/http/echo.php
@@ -0,0 +1,10 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ $resp->getBody()->write(strtoupper($req->getQueryParams()['hello']));
+ return $resp;
+} \ No newline at end of file
diff --git a/php-src/tests/http/header.php b/php-src/tests/http/header.php
new file mode 100644
index 00000000..9bb15ccd
--- /dev/null
+++ b/php-src/tests/http/header.php
@@ -0,0 +1,9 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ return $resp->withAddedHeader("Header", $req->getQueryParams()['hello']);
+} \ No newline at end of file
diff --git a/server.go b/server.go
index d914760b..c194bc78 100644
--- a/server.go
+++ b/server.go
@@ -63,7 +63,7 @@ func (srv *Server) Start() (err error) {
return err
}
- if srv.pool, err = NewPool(srv.cfg.makeCommand(), srv.factory, srv.cfg.Pool); err != nil {
+ if srv.pool, err = NewPool(srv.cfg.makeCommand(), srv.factory, *srv.cfg.Pool); err != nil {
return err
}
@@ -124,7 +124,7 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error {
previous := srv.pool
srv.mu.Unlock()
- pool, err := NewPool(cfg.makeCommand(), srv.factory, cfg.Pool)
+ pool, err := NewPool(cfg.makeCommand(), srv.factory, *cfg.Pool)
if err != nil {
return err
}
diff --git a/server_config.go b/server_config.go
index 4a03f44d..8af0e0f8 100644
--- a/server_config.go
+++ b/server_config.go
@@ -24,7 +24,7 @@ type ServerConfig struct {
// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
// while server is running.
- Pool Config
+ Pool *Config
}
// Differs returns true if configuration has changed but ignores pool or cmd changes.
diff --git a/server_test.go b/server_test.go
index ffca3f12..2e92aad8 100644
--- a/server_test.go
+++ b/server_test.go
@@ -13,7 +13,7 @@ func TestServer_PipesEcho(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -39,7 +39,7 @@ func TestServer_SocketEcho(t *testing.T) {
Command: "php php-src/tests/client.php echo tcp",
Relay: "tcp://:9007",
RelayTimeout: 10 * time.Second,
- Pool: Config{
+ Pool: &Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -64,7 +64,7 @@ func TestServer_Configure_BeforeStart(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -75,7 +75,7 @@ func TestServer_Configure_BeforeStart(t *testing.T) {
err := srv.Reconfigure(&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 2,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -101,7 +101,7 @@ func TestServer_Stop_NotStarted(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -116,7 +116,7 @@ func TestServer_Reconfigure(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -130,7 +130,7 @@ func TestServer_Reconfigure(t *testing.T) {
err := srv.Reconfigure(&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 2,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -146,7 +146,7 @@ func TestServer_Reset(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -168,7 +168,7 @@ func TestServer_ReplacePool(t *testing.T) {
&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -197,7 +197,7 @@ func TestServer_ServerFailure(t *testing.T) {
srv := NewServer(&ServerConfig{
Command: "php php-src/tests/client.php echo pipes",
Relay: "pipes",
- Pool: Config{
+ Pool: &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
diff --git a/service/http/config.go b/service/http/config.go
index d92b4c60..3db172b1 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -12,7 +12,7 @@ type Config struct {
// Address and port to handle as http server.
Address string
- // MaxRequest specified max size for payload body in bytes, set 0 to unlimited.
+ // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited.
MaxRequest int64
// Uploads configures uploads configuration.
diff --git a/service/http/response.go b/service/http/response.go
index dd092353..69bcf3e1 100644
--- a/service/http/response.go
+++ b/service/http/response.go
@@ -34,7 +34,6 @@ func (r *Response) Write(w http.ResponseWriter) error {
for k, v := range r.Headers {
for _, h := range v {
w.Header().Add(k, h)
-
}
}
diff --git a/service/http/response_test.go b/service/http/response_test.go
new file mode 100644
index 00000000..dfc08104
--- /dev/null
+++ b/service/http/response_test.go
@@ -0,0 +1,70 @@
+package http
+
+import (
+ "testing"
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+ "net/http"
+ "bytes"
+)
+
+type testWriter struct {
+ h http.Header
+ buf bytes.Buffer
+ wroteHeader bool
+ code int
+}
+
+func (tw *testWriter) Header() http.Header { return tw.h }
+
+func (tw *testWriter) Write(p []byte) (int, error) {
+ if !tw.wroteHeader {
+ tw.WriteHeader(http.StatusOK)
+ }
+
+ return tw.buf.Write(p)
+}
+
+func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code }
+
+func TestNewResponse_Error(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)})
+ assert.Error(t, err)
+ assert.Nil(t, r)
+}
+
+func TestNewResponse_Write(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ Body: []byte(`sample body`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "sample body", w.buf.String())
+}
+
+func TestNewResponse_Stream(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "hello world", w.buf.String())
+}
diff --git a/service/http/server.go b/service/http/server.go
index de414b08..4ac9b03c 100644
--- a/service/http/server.go
+++ b/service/http/server.go
@@ -51,7 +51,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if size, err := strconv.ParseInt(length, 10, 64); err != nil {
s.handleError(w, r, err)
return
- } else if size > s.cfg.MaxRequest {
+ } else if size > s.cfg.MaxRequest*1024*1024 {
s.handleError(w, r, errors.New("request body max size is exceeded"))
return
}
diff --git a/service/http/server_test.go b/service/http/server_test.go
new file mode 100644
index 00000000..07c5bbdc
--- /dev/null
+++ b/service/http/server_test.go
@@ -0,0 +1,94 @@
+package http
+
+import (
+ "testing"
+ "github.com/spiral/roadrunner"
+ "os"
+ "github.com/stretchr/testify/assert"
+ "net/http"
+ "context"
+ "bytes"
+ "io"
+)
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ buf := new(bytes.Buffer)
+ io.Copy(buf, r.Body)
+
+ return buf.String(), r, nil
+}
+
+func TestServer_Echo(t *testing.T) {
+ st := &Server{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+
+ body, r, err := get("http://localhost:8077/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestServer_EchoHeaders(t *testing.T) {
+ st := &Server{
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../php-src/tests/http/client.php header pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, st.rr.Start())
+ defer st.rr.Stop()
+
+ hs := &http.Server{Addr: ":8077", Handler: st,}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+
+ _, r, err := get("http://localhost:8077/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "world", r.Header.Get("Header"))
+}