summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md66
-rw-r--r--go.mod6
-rw-r--r--go.sum12
-rwxr-xr-xinternal/protocol.go9
-rwxr-xr-xtests/pipes_test_script.sh2
-rwxr-xr-xtests/script.sh2
-rwxr-xr-xtests/socket_test_script.sh2
-rwxr-xr-xtransport/pipe/pipe_factory.go21
-rwxr-xr-xtransport/pipe/pipe_factory_test.go27
-rwxr-xr-xtransport/socket/socket_factory.go20
-rwxr-xr-xtransport/socket/socket_factory_test.go40
11 files changed, 147 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 09f40dd3..46de41cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -73,39 +73,55 @@ ssl:
]
```
-- ✏️ Add a new option to the `service` plugin. Service plugin will not use std RR logger as output in the flavor of raw output.
+- ✏️ Add a new option to the `log` plugin to configure the line ending. By default, used `\n`.
-New options:
+**New option**:
```yaml
-# Service plugin settings
-service:
- some_service_1:
+# Logs plugin settings
+logs:
(....)
- # Console output
+ # Line ending
#
- # Default: stderr. Available options: stderr, stdout
- output: "stderr"
-
- # Endings for the stderr/stdout output
- #
- # Default: "\n". Available options: any.
+ # Default: "\n".
line_ending: "\n"
-
- # Color for regular output
- #
- # Default: none. Available options: white, red, green, yellow, blue, magenta
- color: "green"
-
- # Color for the process errors
- #
- # Default: none. Available options: white, red, green, yellow, blue, magenta
- err_color: "red"
```
-**!!!**
-Be careful, now, there is no logger plugin dependency for the `service` plugin. That means, that if you used `json` output, now,
-you need to serialize data on the `executable` (in the command) side.
+- ✏️ [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level.
+```yaml
+http:
+ address: 127.0.0.1:55555
+ max_request_size: 1024
+ access_logs: true <-------- Access Logs ON/OFF
+ middleware: []
+
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+```
+- ✏️ HTTP middleware to handle Symfony's `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9).
+```yaml
+http:
+ address: 127.0.0.1:44444
+ max_request_size: 1024
+ middleware: ["sendfile"] <----- NEW MIDDLEWARE
+
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+```
+- ✏️ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key:
+```yaml
+server:
+ command: "./script.sh OR sh script.sh" <--- UPDATED
+ relay: "pipes"
+ relay_timeout: "20s"
+```
+The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`.
## 🩹 Fixes:
diff --git a/go.mod b/go.mod
index 3586f87a..c7e35071 100644
--- a/go.mod
+++ b/go.mod
@@ -30,10 +30,10 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
- golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 // indirect
- golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect
+ golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 // indirect
+ golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef // indirect
golang.org/x/text v0.3.7 // indirect
- google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 // indirect
+ google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
diff --git a/go.sum b/go.sum
index cbd82b32..fe27b062 100644
--- a/go.sum
+++ b/go.sum
@@ -112,8 +112,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
-golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 h1:Z04ewVs7JhXaYkmDhBERPi41gnltfQpMWDnTnQbaCqk=
-golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211005215030-d2e5035098b3 h1:G64nFNerDErBd2KdvHvIn3Ee6ccUQBTfhDZEO0DccfU=
+golang.org/x/net v0.0.0-20211005215030-d2e5035098b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -132,8 +132,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211002104244-808efd93c36d h1:SABT8Vei3iTiu+Gy8KOzpSNz+W1EQ5YBCRtiEETxF+0=
-golang.org/x/sys v0.0.0-20211002104244-808efd93c36d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef h1:fPxZ3Umkct3LZ8gK9nbk+DWDJ9fstZa2grBn+lWVKPs=
+golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -155,8 +155,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
-google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9 h1:eF1wcrhdz56Vugf8qNX5dD93ItkrhothojQyHXqloe0=
-google.golang.org/genproto v0.0.0-20211001223012-bfb93cce50d9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e h1:Im71rbA1N3CbIag/PumYhQcNR8bLNmuOtRIyOnnLsT8=
+google.golang.org/genproto v0.0.0-20211005153810-c76a74d43a8e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
diff --git a/internal/protocol.go b/internal/protocol.go
index 78174118..73cb960e 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -68,8 +68,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
fr.WritePayload(data)
fr.WriteCRC(fr.Header())
- // hold a pointer to a frame
- // Do we need a copy here????
+ // we don't need a copy here, because frame copy the data before send
err = rl.Send(fr)
if err != nil {
return errors.E(op, err)
@@ -78,7 +77,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
return nil
}
-func FetchPID(rl relay.Relay) (int64, error) {
+func Pid(rl relay.Relay) (int64, error) {
const op = errors.Op("fetch_pid")
err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
@@ -111,5 +110,9 @@ func FetchPID(rl relay.Relay) (int64, error) {
return 0, errors.E(op, err)
}
+ if link.Pid <= 0 {
+ return 0, errors.E(op, errors.Str("pid should be greater than 0"))
+ }
+
return int64(link.Pid), nil
}
diff --git a/tests/pipes_test_script.sh b/tests/pipes_test_script.sh
new file mode 100755
index 00000000..c759b0a6
--- /dev/null
+++ b/tests/pipes_test_script.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+php ../../tests/client.php echo pipes
diff --git a/tests/script.sh b/tests/script.sh
deleted file mode 100755
index 746fb768..00000000
--- a/tests/script.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/usr/bin/env bash
-php ../../../tests/client.php echo pipes \ No newline at end of file
diff --git a/tests/socket_test_script.sh b/tests/socket_test_script.sh
new file mode 100755
index 00000000..3948c4fb
--- /dev/null
+++ b/tests/socket_test_script.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+php ../../tests/client.php echo tcp
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 0d46f496..84a9d311 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -29,7 +29,7 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
@@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- pid, err := internal.FetchPID(relay)
+ // used as a ping
+ _, err = internal.Pid(relay)
if err != nil {
err = multierr.Combine(
err,
@@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- if pid != w.Pid() {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
select {
case
// return worker
@@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// errors bundle
- if pid, err := internal.FetchPID(relay); pid != w.Pid() {
+ _, err = internal.Pid(relay)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index f8198610..b4ba8c87 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Pipe_Echo_Script(t *testing.T) {
+ t.Parallel()
+ cmd := exec.Command("sh", "../../tests/pipes_test_script.sh")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Pipe_Broken(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index d98ce607..39c04eac 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -66,7 +66,7 @@ func (f *Factory) listen() error {
}
rl := socket.NewSocketRelay(conn)
- pid, err := internal.FetchPID(rl)
+ pid, err := internal.Pid(rl)
if err != nil {
return err
}
@@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
w.AttachRelay(rl)
// errors bundle
- if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ _, err = internal.Pid(rl)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
@@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
return nil, err
}
default:
- tmp, ok := f.relays.LoadAndDelete(w.Pid())
- if !ok {
+ // find first pid and attach relay to it
+ var r *socket.Relay
+ f.relays.Range(func(k, val interface{}) bool {
+ r = val.(*socket.Relay)
+ f.relays.Delete(k)
+ return false
+ })
+
+ // no relay exists
+ if r == nil {
continue
}
- return tmp.(*socket.Relay), nil
+
+ return r, nil
}
}
}
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 879dba8e..d517d026 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Tcp_Echo_Script(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err = ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("sh", "../../tests/socket_test_script.sh")
+
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Unix_Start(t *testing.T) {
ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")