diff options
-rw-r--r-- | CHANGELOG.md | 66 | ||||
-rw-r--r-- | go.mod | 6 | ||||
-rw-r--r-- | go.sum | 12 | ||||
-rwxr-xr-x | internal/protocol.go | 9 | ||||
-rwxr-xr-x | tests/pipes_test_script.sh | 2 | ||||
-rwxr-xr-x | tests/script.sh | 2 | ||||
-rwxr-xr-x | tests/socket_test_script.sh | 2 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory.go | 21 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 27 | ||||
-rwxr-xr-x | transport/socket/socket_factory.go | 20 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 40 |
11 files changed, 147 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 09f40dd3..46de41cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,39 +73,55 @@ ssl: ] ``` -- ✏️ Add a new option to the `service` plugin. Service plugin will not use std RR logger as output in the flavor of raw output. +- ✏️ Add a new option to the `log` plugin to configure the line ending. By default, used `\n`. -New options: +**New option**: ```yaml -# Service plugin settings -service: - some_service_1: +# Logs plugin settings +logs: (....) - # Console output + # Line ending # - # Default: stderr. Available options: stderr, stdout - output: "stderr" - - # Endings for the stderr/stdout output - # - # Default: "\n". Available options: any. + # Default: "\n". line_ending: "\n" - - # Color for regular output - # - # Default: none. Available options: white, red, green, yellow, blue, magenta - color: "green" - - # Color for the process errors - # - # Default: none. Available options: white, red, green, yellow, blue, magenta - err_color: "red" ``` -**!!!** -Be careful, now, there is no logger plugin dependency for the `service` plugin. That means, that if you used `json` output, now, -you need to serialize data on the `executable` (in the command) side. +- ✏️ [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level. +```yaml +http: + address: 127.0.0.1:55555 + max_request_size: 1024 + access_logs: true <-------- Access Logs ON/OFF + middleware: [] + + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s +``` +- ✏️ HTTP middleware to handle Symfony's `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). +```yaml +http: + address: 127.0.0.1:44444 + max_request_size: 1024 + middleware: ["sendfile"] <----- NEW MIDDLEWARE + + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s +``` +- ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key: +```yaml +server: + command: "./script.sh OR sh script.sh" <--- UPDATED + relay: "pipes" + relay_timeout: "20s" +``` +The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`. ## 🩹 Fixes: @@ -30,10 +30,10 @@ require ( github.com/tklauser/go-sysconf v0.3.9 // indirect github.com/tklauser/numcpus v0.3.0 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 // indirect - golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect + golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 // indirect + golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 // indirect + google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect @@ -112,8 +112,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 h1:Z04ewVs7JhXaYkmDhBERPi41gnltfQpMWDnTnQbaCqk= -golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 h1:G64nFNerDErBd2KdvHvIn3Ee6ccUQBTfhDZEO0DccfU= +golang.org/x/net v0.0.0-20211005215030-d2e5035098b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -132,8 +132,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211002104244-808efd93c36d h1:SABT8Vei3iTiu+Gy8KOzpSNz+W1EQ5YBCRtiEETxF+0= -golang.org/x/sys v0.0.0-20211002104244-808efd93c36d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef h1:fPxZ3Umkct3LZ8gK9nbk+DWDJ9fstZa2grBn+lWVKPs= +golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -155,8 +155,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 h1:eF1wcrhdz56Vugf8qNX5dD93ItkrhothojQyHXqloe0= -google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e h1:Im71rbA1N3CbIag/PumYhQcNR8bLNmuOtRIyOnnLsT8= +google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/internal/protocol.go b/internal/protocol.go index 78174118..73cb960e 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -68,8 +68,7 @@ func SendControl(rl relay.Relay, payload interface{}) error { fr.WritePayload(data) fr.WriteCRC(fr.Header()) - // hold a pointer to a frame - // Do we need a copy here???? + // we don't need a copy here, because frame copy the data before send err = rl.Send(fr) if err != nil { return errors.E(op, err) @@ -78,7 +77,7 @@ func SendControl(rl relay.Relay, payload interface{}) error { return nil } -func FetchPID(rl relay.Relay) (int64, error) { +func Pid(rl relay.Relay) (int64, error) { const op = errors.Op("fetch_pid") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { @@ -111,5 +110,9 @@ func FetchPID(rl relay.Relay) (int64, error) { return 0, errors.E(op, err) } + if link.Pid <= 0 { + return 0, errors.E(op, errors.Str("pid should be greater than 0")) + } + return int64(link.Pid), nil } diff --git a/tests/pipes_test_script.sh b/tests/pipes_test_script.sh new file mode 100755 index 00000000..c759b0a6 --- /dev/null +++ b/tests/pipes_test_script.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +php ../../tests/client.php echo pipes diff --git a/tests/script.sh b/tests/script.sh deleted file mode 100755 index 746fb768..00000000 --- a/tests/script.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env bash -php ../../../tests/client.php echo pipes
\ No newline at end of file diff --git a/tests/socket_test_script.sh b/tests/socket_test_script.sh new file mode 100755 index 00000000..3948c4fb --- /dev/null +++ b/tests/socket_test_script.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +php ../../tests/client.php echo tcp diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 0d46f496..84a9d311 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -29,7 +29,7 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { @@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - pid, err := internal.FetchPID(relay) + // used as a ping + _, err = internal.Pid(relay) if err != nil { err = multierr.Combine( err, @@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - if pid != w.Pid() { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), - }: - return - default: - _ = w.Kill() - return - } - } - select { case // return worker @@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { + _, err = internal.Pid(relay) + if err != nil { err = multierr.Combine( err, w.Kill(), diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index f8198610..b4ba8c87 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Pipe_Echo_Script(t *testing.T) { + t.Parallel() + cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Pipe_Broken(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index d98ce607..39c04eac 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -66,7 +66,7 @@ func (f *Factory) listen() error { } rl := socket.NewSocketRelay(conn) - pid, err := internal.FetchPID(rl) + pid, err := internal.Pid(rl) if err != nil { return err } @@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor w.AttachRelay(rl) // errors bundle - if pid, err := internal.FetchPID(rl); pid != w.Pid() { + _, err = internal.Pid(rl) + if err != nil { err = multierr.Combine( err, w.Kill(), @@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess return nil, err } default: - tmp, ok := f.relays.LoadAndDelete(w.Pid()) - if !ok { + // find first pid and attach relay to it + var r *socket.Relay + f.relays.Range(func(k, val interface{}) bool { + r = val.(*socket.Relay) + f.relays.Delete(k) + return false + }) + + // no relay exists + if r == nil { continue } - return tmp.(*socket.Relay), nil + + return r, nil } } } diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 879dba8e..d517d026 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Tcp_Echo_Script(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("sh", "../../tests/socket_test_script.sh") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Unix_Start(t *testing.T) { ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") |