summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-06 22:35:30 +0300
committerGitHub <[email protected]>2021-10-06 22:35:30 +0300
commit98f4e867af8e4dce3d8951227db8bd1268984551 (patch)
treeddf182386f78a64210e580bcf7a0c8ab34cf6712
parent59e29a90cfa6f16c790d16ac1f03a0f9f82b73d6 (diff)
parentb25e0bcc94b237b3fabf6f582a3864efd300033b (diff)
[#823]: feat(pipes,sockets): allow running scripts in the server's command
## Description of Changes - ✏️ Remove requirement to share the same PID for the worker and root process. According to the fork (and to be precise, `fork-exec` in go) man pages documentation, ```text The child inherits copies of the parent's set of open file descriptors. Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent. This means that the two file descriptors share open file status flags, file offset, and signal-driven I/O attributes (see the description of F_SETOWN and F_SETSIG in fcntl(2)). ``` - ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key: ```yaml server: command: "./script.sh OR sh script.sh" <--- UPDATED relay: "pipes" relay_timeout: "20s" ``` The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`.
-rw-r--r--CHANGELOG.md66
-rw-r--r--go.mod6
-rw-r--r--go.sum12
-rwxr-xr-xinternal/protocol.go9
-rwxr-xr-xtests/pipes_test_script.sh2
-rwxr-xr-xtests/script.sh2
-rwxr-xr-xtests/socket_test_script.sh2
-rwxr-xr-xtransport/pipe/pipe_factory.go21
-rwxr-xr-xtransport/pipe/pipe_factory_test.go27
-rwxr-xr-xtransport/socket/socket_factory.go20
-rwxr-xr-xtransport/socket/socket_factory_test.go40
11 files changed, 147 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 09f40dd3..46de41cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -73,39 +73,55 @@ ssl:
]
```
-- ✏️ Add a new option to the `service` plugin. Service plugin will not use std RR logger as output in the flavor of raw output.
+- ✏️ Add a new option to the `log` plugin to configure the line ending. By default, used `\n`.
-New options:
+**New option**:
```yaml
-# Service plugin settings
-service:
- some_service_1:
+# Logs plugin settings
+logs:
(....)
- # Console output
+ # Line ending
#
- # Default: stderr. Available options: stderr, stdout
- output: "stderr"
-
- # Endings for the stderr/stdout output
- #
- # Default: "\n". Available options: any.
+ # Default: "\n".
line_ending: "\n"
-
- # Color for regular output
- #
- # Default: none. Available options: white, red, green, yellow, blue, magenta
- color: "green"
-
- # Color for the process errors
- #
- # Default: none. Available options: white, red, green, yellow, blue, magenta
- err_color: "red"
```
-**!!!**
-Be careful, now, there is no logger plugin dependency for the `service` plugin. That means, that if you used `json` output, now,
-you need to serialize data on the `executable` (in the command) side.
+- ✏️ [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level.
+```yaml
+http:
+ address: 127.0.0.1:55555
+ max_request_size: 1024
+ access_logs: true <-------- Access Logs ON/OFF
+ middleware: []
+
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+```
+- ✏️ HTTP middleware to handle Symfony's `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9).
+```yaml
+http:
+ address: 127.0.0.1:44444
+ max_request_size: 1024
+ middleware: ["sendfile"] <----- NEW MIDDLEWARE
+
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+```
+- ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key:
+```yaml
+server:
+ command: "./script.sh OR sh script.sh" <--- UPDATED
+ relay: "pipes"
+ relay_timeout: "20s"
+```
+The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`.
## 🩹 Fixes:
diff --git a/go.mod b/go.mod
index 3586f87a..c7e35071 100644
--- a/go.mod
+++ b/go.mod
@@ -30,10 +30,10 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
- golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 // indirect
- golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect
+ golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 // indirect
+ golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef // indirect
golang.org/x/text v0.3.7 // indirect
- google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 // indirect
+ google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
diff --git a/go.sum b/go.sum
index cbd82b32..fe27b062 100644
--- a/go.sum
+++ b/go.sum
@@ -112,8 +112,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
-golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 h1:Z04ewVs7JhXaYkmDhBERPi41gnltfQpMWDnTnQbaCqk=
-golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 h1:G64nFNerDErBd2KdvHvIn3Ee6ccUQBTfhDZEO0DccfU=
+golang.org/x/net v0.0.0-20211005215030-d2e5035098b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -132,8 +132,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211002104244-808efd93c36d h1:SABT8Vei3iTiu+Gy8KOzpSNz+W1EQ5YBCRtiEETxF+0=
-golang.org/x/sys v0.0.0-20211002104244-808efd93c36d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef h1:fPxZ3Umkct3LZ8gK9nbk+DWDJ9fstZa2grBn+lWVKPs=
+golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -155,8 +155,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
-google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 h1:eF1wcrhdz56Vugf8qNX5dD93ItkrhothojQyHXqloe0=
-google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e h1:Im71rbA1N3CbIag/PumYhQcNR8bLNmuOtRIyOnnLsT8=
+google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
diff --git a/internal/protocol.go b/internal/protocol.go
index 78174118..73cb960e 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -68,8 +68,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
fr.WritePayload(data)
fr.WriteCRC(fr.Header())
- // hold a pointer to a frame
- // Do we need a copy here????
+ // we don't need a copy here, because frame copy the data before send
err = rl.Send(fr)
if err != nil {
return errors.E(op, err)
@@ -78,7 +77,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
return nil
}
-func FetchPID(rl relay.Relay) (int64, error) {
+func Pid(rl relay.Relay) (int64, error) {
const op = errors.Op("fetch_pid")
err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
@@ -111,5 +110,9 @@ func FetchPID(rl relay.Relay) (int64, error) {
return 0, errors.E(op, err)
}
+ if link.Pid <= 0 {
+ return 0, errors.E(op, errors.Str("pid should be greater than 0"))
+ }
+
return int64(link.Pid), nil
}
diff --git a/tests/pipes_test_script.sh b/tests/pipes_test_script.sh
new file mode 100755
index 00000000..c759b0a6
--- /dev/null
+++ b/tests/pipes_test_script.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+php ../../tests/client.php echo pipes
diff --git a/tests/script.sh b/tests/script.sh
deleted file mode 100755
index 746fb768..00000000
--- a/tests/script.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/usr/bin/env bash
-php ../../../tests/client.php echo pipes \ No newline at end of file
diff --git a/tests/socket_test_script.sh b/tests/socket_test_script.sh
new file mode 100755
index 00000000..3948c4fb
--- /dev/null
+++ b/tests/socket_test_script.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+php ../../tests/client.php echo tcp
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 0d46f496..84a9d311 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -29,7 +29,7 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
@@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- pid, err := internal.FetchPID(relay)
+ // used as a ping
+ _, err = internal.Pid(relay)
if err != nil {
err = multierr.Combine(
err,
@@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- if pid != w.Pid() {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
select {
case
// return worker
@@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// errors bundle
- if pid, err := internal.FetchPID(relay); pid != w.Pid() {
+ _, err = internal.Pid(relay)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index f8198610..b4ba8c87 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Pipe_Echo_Script(t *testing.T) {
+ t.Parallel()
+ cmd := exec.Command("sh", "../../tests/pipes_test_script.sh")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Pipe_Broken(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index d98ce607..39c04eac 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -66,7 +66,7 @@ func (f *Factory) listen() error {
}
rl := socket.NewSocketRelay(conn)
- pid, err := internal.FetchPID(rl)
+ pid, err := internal.Pid(rl)
if err != nil {
return err
}
@@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
w.AttachRelay(rl)
// errors bundle
- if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ _, err = internal.Pid(rl)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
@@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
return nil, err
}
default:
- tmp, ok := f.relays.LoadAndDelete(w.Pid())
- if !ok {
+ // find first pid and attach relay to it
+ var r *socket.Relay
+ f.relays.Range(func(k, val interface{}) bool {
+ r = val.(*socket.Relay)
+ f.relays.Delete(k)
+ return false
+ })
+
+ // no relay exists
+ if r == nil {
continue
}
- return tmp.(*socket.Relay), nil
+
+ return r, nil
}
}
}
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 879dba8e..d517d026 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Tcp_Echo_Script(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err = ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("sh", "../../tests/socket_test_script.sh")
+
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Unix_Start(t *testing.T) {
ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")