diff options
author | Valery Piashchynski <[email protected]> | 2021-10-06 22:35:30 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-10-06 22:35:30 +0300 |
commit | 98f4e867af8e4dce3d8951227db8bd1268984551 (patch) | |
tree | ddf182386f78a64210e580bcf7a0c8ab34cf6712 | |
parent | 59e29a90cfa6f16c790d16ac1f03a0f9f82b73d6 (diff) | |
parent | b25e0bcc94b237b3fabf6f582a3864efd300033b (diff) |
[#823]: feat(pipes,sockets): allow running scripts in the server's command
## Description of Changes
- ✏️ Remove requirement to share the same PID for the worker and root process.
According to the fork (and to be precise, `fork-exec` in go) man pages documentation,
```text
The child inherits copies of the parent's set of open file descriptors.
Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent.
This means that the two file descriptors share open file status flags, file offset, and signal-driven I/O attributes (see the description of F_SETOWN and F_SETSIG in fcntl(2)).
```
- ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key:
```yaml
server:
command: "./script.sh OR sh script.sh" <--- UPDATED
relay: "pipes"
relay_timeout: "20s"
```
The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`.
-rw-r--r-- | CHANGELOG.md | 66 | ||||
-rw-r--r-- | go.mod | 6 | ||||
-rw-r--r-- | go.sum | 12 | ||||
-rwxr-xr-x | internal/protocol.go | 9 | ||||
-rwxr-xr-x | tests/pipes_test_script.sh | 2 | ||||
-rwxr-xr-x | tests/script.sh | 2 | ||||
-rwxr-xr-x | tests/socket_test_script.sh | 2 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory.go | 21 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 27 | ||||
-rwxr-xr-x | transport/socket/socket_factory.go | 20 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 40 |
11 files changed, 147 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 09f40dd3..46de41cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,39 +73,55 @@ ssl: ] ``` -- ✏️ Add a new option to the `service` plugin. Service plugin will not use std RR logger as output in the flavor of raw output. +- ✏️ Add a new option to the `log` plugin to configure the line ending. By default, used `\n`. -New options: +**New option**: ```yaml -# Service plugin settings -service: - some_service_1: +# Logs plugin settings +logs: (....) - # Console output + # Line ending # - # Default: stderr. Available options: stderr, stdout - output: "stderr" - - # Endings for the stderr/stdout output - # - # Default: "\n". Available options: any. + # Default: "\n". line_ending: "\n" - - # Color for regular output - # - # Default: none. Available options: white, red, green, yellow, blue, magenta - color: "green" - - # Color for the process errors - # - # Default: none. Available options: white, red, green, yellow, blue, magenta - err_color: "red" ``` -**!!!** -Be careful, now, there is no logger plugin dependency for the `service` plugin. That means, that if you used `json` output, now, -you need to serialize data on the `executable` (in the command) side. +- ✏️ [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level. +```yaml +http: + address: 127.0.0.1:55555 + max_request_size: 1024 + access_logs: true <-------- Access Logs ON/OFF + middleware: [] + + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s +``` +- ✏️ HTTP middleware to handle Symfony's `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). +```yaml +http: + address: 127.0.0.1:44444 + max_request_size: 1024 + middleware: ["sendfile"] <----- NEW MIDDLEWARE + + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s +``` +- ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key: +```yaml +server: + command: "./script.sh OR sh script.sh" <--- UPDATED + relay: "pipes" + relay_timeout: "20s" +``` +The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`. ## 🩹 Fixes: @@ -30,10 +30,10 @@ require ( github.com/tklauser/go-sysconf v0.3.9 // indirect github.com/tklauser/numcpus v0.3.0 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 // indirect - golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect + golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 // indirect + golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 // indirect + google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect @@ -112,8 +112,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 h1:Z04ewVs7JhXaYkmDhBERPi41gnltfQpMWDnTnQbaCqk= -golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 h1:G64nFNerDErBd2KdvHvIn3Ee6ccUQBTfhDZEO0DccfU= +golang.org/x/net v0.0.0-20211005215030-d2e5035098b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -132,8 +132,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211002104244-808efd93c36d h1:SABT8Vei3iTiu+Gy8KOzpSNz+W1EQ5YBCRtiEETxF+0= -golang.org/x/sys v0.0.0-20211002104244-808efd93c36d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef h1:fPxZ3Umkct3LZ8gK9nbk+DWDJ9fstZa2grBn+lWVKPs= +golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -155,8 +155,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 h1:eF1wcrhdz56Vugf8qNX5dD93ItkrhothojQyHXqloe0= -google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e h1:Im71rbA1N3CbIag/PumYhQcNR8bLNmuOtRIyOnnLsT8= +google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/internal/protocol.go b/internal/protocol.go index 78174118..73cb960e 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -68,8 +68,7 @@ func SendControl(rl relay.Relay, payload interface{}) error { fr.WritePayload(data) fr.WriteCRC(fr.Header()) - // hold a pointer to a frame - // Do we need a copy here???? + // we don't need a copy here, because frame copy the data before send err = rl.Send(fr) if err != nil { return errors.E(op, err) @@ -78,7 +77,7 @@ func SendControl(rl relay.Relay, payload interface{}) error { return nil } -func FetchPID(rl relay.Relay) (int64, error) { +func Pid(rl relay.Relay) (int64, error) { const op = errors.Op("fetch_pid") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { @@ -111,5 +110,9 @@ func FetchPID(rl relay.Relay) (int64, error) { return 0, errors.E(op, err) } + if link.Pid <= 0 { + return 0, errors.E(op, errors.Str("pid should be greater than 0")) + } + return int64(link.Pid), nil } diff --git a/tests/pipes_test_script.sh b/tests/pipes_test_script.sh new file mode 100755 index 00000000..c759b0a6 --- /dev/null +++ b/tests/pipes_test_script.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +php ../../tests/client.php echo pipes diff --git a/tests/script.sh b/tests/script.sh deleted file mode 100755 index 746fb768..00000000 --- a/tests/script.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env bash -php ../../../tests/client.php echo pipes
\ No newline at end of file diff --git a/tests/socket_test_script.sh b/tests/socket_test_script.sh new file mode 100755 index 00000000..3948c4fb --- /dev/null +++ b/tests/socket_test_script.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +php ../../tests/client.php echo tcp diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 0d46f496..84a9d311 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -29,7 +29,7 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { @@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - pid, err := internal.FetchPID(relay) + // used as a ping + _, err = internal.Pid(relay) if err != nil { err = multierr.Combine( err, @@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - if pid != w.Pid() { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), - }: - return - default: - _ = w.Kill() - return - } - } - select { case // return worker @@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { + _, err = internal.Pid(relay) + if err != nil { err = multierr.Combine( err, w.Kill(), diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index f8198610..b4ba8c87 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Pipe_Echo_Script(t *testing.T) { + t.Parallel() + cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Pipe_Broken(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index d98ce607..39c04eac 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -66,7 +66,7 @@ func (f *Factory) listen() error { } rl := socket.NewSocketRelay(conn) - pid, err := internal.FetchPID(rl) + pid, err := internal.Pid(rl) if err != nil { return err } @@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor w.AttachRelay(rl) // errors bundle - if pid, err := internal.FetchPID(rl); pid != w.Pid() { + _, err = internal.Pid(rl) + if err != nil { err = multierr.Combine( err, w.Kill(), @@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess return nil, err } default: - tmp, ok := f.relays.LoadAndDelete(w.Pid()) - if !ok { + // find first pid and attach relay to it + var r *socket.Relay + f.relays.Range(func(k, val interface{}) bool { + r = val.(*socket.Relay) + f.relays.Delete(k) + return false + }) + + // no relay exists + if r == nil { continue } - return tmp.(*socket.Relay), nil + + return r, nil } } } diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 879dba8e..d517d026 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Tcp_Echo_Script(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("sh", "../../tests/socket_test_script.sh") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Unix_Start(t *testing.T) { ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") |