summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-04 12:26:16 +0300
committerGitHub <[email protected]>2021-02-04 12:26:16 +0300
commitb9c9909b98c1b3e15421a4bcad9e8fcc01332d37 (patch)
treeb969808a6bdd67bbb566421a2158a51c9de3713e
parent8a8d9d7c64226397792e8f1aa7cc607ab413906e (diff)
parenta902a06e670d70b0f806899765bdb206977e7698 (diff)
Merge pull request #526 from spiral/fix/memory_leaksv2.0.0-beta.24
bug(leak): workers memory leak
-rw-r--r--.github/workflows/codeql-analysis.yml6
-rwxr-xr-x.rr.yaml4
-rwxr-xr-xbors.toml24
-rw-r--r--pkg/events/worker_events.go5
-rwxr-xr-xpkg/pool/static_pool_test.go6
-rw-r--r--pkg/pool/supervisor_test.go2
-rw-r--r--pkg/states/worker_states.go2
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go14
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go16
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go62
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go64
-rwxr-xr-xpkg/worker/sync_worker.go2
-rwxr-xr-xpkg/worker/worker.go134
-rw-r--r--plugins/informer/rpc.go1
-rw-r--r--plugins/server/plugin.go18
-rw-r--r--tests/composer.json5
-rw-r--r--tests/plugins/http/http_plugin_test.go1
-rw-r--r--tests/plugins/reload/reload_plugin_test.go30
18 files changed, 218 insertions, 178 deletions
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index 75e40110..f630ff40 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -7,17 +7,17 @@ name: "CodeQL"
on:
push:
- branches: [master]
+ branches: [roadrunner-core, roadrunner-binary]
pull_request:
# The branches below must be a subset of the branches above
- branches: [master]
+ branches: [roadrunner-core, roadrunner-binary]
schedule:
- cron: '0 15 * * 6'
jobs:
analyze:
name: Analyze
- runs-on: ubuntu-latest
+ runs-on: ubuntu-20.04
strategy:
fail-fast: false
diff --git a/.rr.yaml b/.rr.yaml
index 1c1a0af2..4cdc4326 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -61,9 +61,13 @@ http:
"output": "output-header"
pool:
+ # default - num of logical CPUs
num_workers: 6
+ # default 0
max_jobs: 0
+ # default 1 minute
allocate_timeout: 60s
+ # default 1 minute
destroy_timeout: 60s
supervisor:
# watch_tick defines how often to check the state of the workers (seconds)
diff --git a/bors.toml b/bors.toml
index e35b52a3..b1681bf2 100755
--- a/bors.toml
+++ b/bors.toml
@@ -1,20 +1,14 @@
status = [
- 'Build (Go 1.14, PHP 7.4, OS ubuntu-latest)',
- 'Build (Go 1.14, PHP 7.4, OS windows-latest)',
- 'Build (Go 1.14, PHP 7.4, OS macos-latest)',
- 'Build (Go 1.15, PHP 7.4, OS ubuntu-latest)',
- 'Build (Go 1.15, PHP 7.4, OS windows-latest)',
- 'Build (Go 1.15, PHP 7.4, OS macos-latest)',
- 'Build (Go 1.14, PHP 8.0, OS ubuntu-latest)',
- 'Build (Go 1.14, PHP 8.0, OS windows-latest)',
- 'Build (Go 1.14, PHP 8.0, OS macos-latest)',
- 'Build (Go 1.15, PHP 8.0, OS ubuntu-latest)',
- 'Build (Go 1.15, PHP 8.0, OS windows-latest)',
- 'Build (Go 1.15, PHP 8.0, OS macos-latest)',
- 'Golang-CI (lint)',
- 'Build docker image',
+ 'Linux / Build (Go 1.14, PHP 7.4, OS ubuntu-20.04)',
+ 'Linux / Build (Go 1.15, PHP 7.4, OS ubuntu-20.04)',
+ 'Linux / Build (Go 1.14, PHP 8.0, OS ubuntu-20.04)',
+ 'Linux / Build (Go 1.15, PHP 8.0, OS ubuntu-20.04)',
+ 'macOS / Build (Go 1.14, PHP 7.4, OS macos-latest)',
+ 'macOS / Build (Go 1.15, PHP 7.4, OS macos-latest)',
+ 'macOS / Build (Go 1.14, PHP 8.0, OS macos-latest)',
+ 'macOS / Build (Go 1.15, PHP 8.0, OS macos-latest)',
+ 'Linux / Golang-CI (lint) ',
]
-
required_approvals = 0
delete_merged_branches = true
timeout-sec = 1800
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 9d428f7d..11bd6ab7 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -3,9 +3,10 @@ package events
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
EventWorkerError W = iota + 11000
-
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
)
type W int64
@@ -16,6 +17,8 @@ func (ev W) String() string {
return "EventWorkerError"
case EventWorkerLog:
return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
}
return "Unknown event type"
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 2d2b2b7d..4cfd5ec6 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -167,11 +167,11 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 1)
+ block := make(chan struct{}, 10)
listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerLog {
+ if wev.Event == events.EventWorkerStderr {
e := string(wev.Payload.([]byte))
if strings.ContainsAny(e, "undefined_function()") {
block <- struct{}{}
@@ -491,7 +491,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 1)
+ block := make(chan struct{}, 10)
listener := func(event interface{}) {
if ev, ok := event.(events.PoolEvent); ok {
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index c67d5d91..cbe9f5cb 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -210,7 +210,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- block := make(chan struct{}, 1)
+ block := make(chan struct{}, 10)
listener := func(event interface{}) {
if ev, ok := event.(events.PoolEvent); ok {
if ev.Event == events.EventMaxMemory {
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
index 22fdfe8a..fe653cb4 100644
--- a/pkg/states/worker_states.go
+++ b/pkg/states/worker_states.go
@@ -16,6 +16,7 @@ const (
// StateStopping - process is being softly stopped.
StateStopping
+ // StateKilling - process is being forcibly stopped
StateKilling
// State of worker, when no need to allocate new one
@@ -27,5 +28,6 @@ const (
// StateErrored - error WorkerState (can't be used).
StateErrored
+ // StateRemove - worker is killed and removed from the stack
StateRemove
)
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index e247324c..73008471 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+ w, err := NewPipeFactory().SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid2(t *testing.T) {
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index b23af19f..3efeb59c 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 0e29e7d2..1361693b 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -3,11 +3,13 @@ package socket
import (
"net"
"os/exec"
+ "strings"
"sync"
"syscall"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
wg.Wait()
+ <-finish
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index f55fc3dd..c13a897b 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -4,10 +4,12 @@ import (
"context"
"net"
"os/exec"
+ "strings"
"sync"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo(t *testing.T) {
@@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout(t *testing.T) {
@@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ block := make(chan struct{})
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerStderr {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
+ <-block
wg.Wait()
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 696fbdb7..010af076 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -36,10 +36,8 @@ func FromSync(w *SyncWorkerImpl) BaseProcess {
state: w.process.state,
cmd: w.process.cmd,
pid: w.process.pid,
- stderr: w.process.stderr,
endState: w.process.endState,
relay: w.process.relay,
- rd: w.process.rd,
}
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 2f1f399d..b726c6f1 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -1,14 +1,11 @@
package worker
import (
- "bytes"
"fmt"
- "io"
"os"
"os/exec"
"strconv"
"strings"
- "sync"
"time"
"github.com/spiral/errors"
@@ -53,27 +50,11 @@ type Process struct {
// can be nil while process is not started.
pid int
- // stderr aggregates stderr output from underlying process. Value can be
- // receive only once command is completed and all pipes are closed.
- stderr *bytes.Buffer
-
- // channel is being closed once command is complete.
- // waitDone chan interface{}
-
// contains information about resulted process state.
endState *os.ProcessState
- // ensures than only one execution can be run at once.
- mu sync.RWMutex
-
// communication bus with underlying process.
relay relay.Relay
- // rd in a second part of pipe to read from stderr
- rd io.Reader
- // stop signal terminates io.Pipe from reading from stderr
- stop chan struct{}
-
- syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(states.StateInactive),
- stderr: new(bytes.Buffer),
- stop: make(chan struct{}, 1),
- // sync pool for STDERR
- // All receivers are pointers
- syncPool: sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
- },
}
- w.rd, w.cmd.Stderr = io.Pipe()
-
- // small buffer optimization
- // at this point we know, that stderr will contain huge messages
- w.stderr.Grow(ReadBufSize)
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
// add options
for i := 0; i < len(options); i++ {
options[i](w)
}
- go func() {
- w.watch()
- }()
-
return w, nil
}
@@ -189,44 +153,36 @@ func (w *Process) Start() error {
// to find or Start the script.
func (w *Process) Wait() error {
const op = errors.Op("process_wait")
- err := multierr.Combine(w.cmd.Wait())
+ var err error
+ err = w.cmd.Wait()
+ // If worker was destroyed, just exit
if w.State().Value() == states.StateDestroyed {
- return errors.E(op, err)
+ return nil
}
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then process.cmd.Wait return an error
- w.endState = w.cmd.ProcessState
+ // If state is different, and err is not nil, append it to the errors
if err != nil {
- w.state.Set(states.StateErrored)
-
- w.mu.RLock()
- // if process return code > 0, here will be an error from stderr (if presents)
- if w.stderr.Len() > 0 {
- err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
- // stop the stderr buffer
- w.stop <- struct{}{}
- }
- w.mu.RUnlock()
-
- return multierr.Append(err, w.closeRelay())
+ w.State().Set(states.StateErrored)
+ err = multierr.Combine(err, errors.E(op, err))
}
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(states.StateErrored)
- return err
+ // closeRelay
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then process.cmd.Wait return an error
+ err2 := w.closeRelay()
+ if err2 != nil {
+ w.State().Set(states.StateErrored)
+ return multierr.Append(err, errors.E(op, err2))
}
- if w.endState.Success() {
- w.state.Set(states.StateStopped)
+ if w.cmd.ProcessState.Success() {
+ w.State().Set(states.StateStopped)
+ return nil
}
- w.stderr.Reset()
-
- return nil
+ return err
}
func (w *Process) closeRelay() error {
@@ -272,48 +228,8 @@ func (w *Process) Kill() error {
return nil
}
-// put the pointer, to not allocate new slice
-// but erase it len and then return back
-func (w *Process) put(data *[]byte) {
- w.syncPool.Put(data)
-}
-
-// get pointer to the byte slice
-func (w *Process) get() *[]byte {
- return w.syncPool.Get().(*[]byte)
-}
-
-// Write appends the contents of pool to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *Process) watch() {
- go func() {
- for {
- select {
- case <-w.stop:
- buf := w.get()
- // read the last data
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- return
- default:
- // read the max 10kb of stderr per one read
- buf := w.get()
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // delete all prev messages
- w.stderr.Reset()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- }
- }
- }()
+// Worker stderr
+func (w *Process) Write(p []byte) (n int, err error) {
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ return len(p), nil
}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index c036ae96..55a9832b 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -26,7 +26,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error {
*list = append(*list, name)
}
rpc.log.Debug("list of services", "list", *list)
-
rpc.log.Debug("successfully finished List method")
return nil
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 73ce71f7..99d93d19 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -140,7 +140,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En
}
list := make([]events.Listener, 0, 1)
- list = append(list, server.collectPoolLogs)
+ list = append(list, server.collectEvents)
if len(listeners) != 0 {
list = append(list, listeners...)
}
@@ -201,7 +201,7 @@ func (server *Plugin) setEnv(e Env) []string {
return env
}
-func (server *Plugin) collectPoolLogs(event interface{}) {
+func (server *Plugin) collectEvents(event interface{}) {
if we, ok := event.(events.PoolEvent); ok {
switch we.Event {
case events.EventMaxMemory:
@@ -234,9 +234,12 @@ func (server *Plugin) collectPoolLogs(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"))
+ case events.EventWorkerStderr:
+ // TODO unsafe byte to string convert
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"))
}
}
}
@@ -245,9 +248,12 @@ func (server *Plugin) collectWorkerLogs(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"))
+ case events.EventWorkerStderr:
+ // TODO unsafe byte to string convert
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"))
}
}
}
diff --git a/tests/composer.json b/tests/composer.json
index 0cf74581..52fa3a0e 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -1,10 +1,11 @@
{
- "minimum-stability": "dev",
+ "minimum-stability": "beta",
"prefer-stable": true,
"require": {
"nyholm/psr7": "^1.3",
"spiral/roadrunner": "^2.0",
"spiral/roadrunner-http": "^2.0",
- "temporal/sdk": "dev-master"
+ "temporal/sdk": ">=1.0",
+ "spiral/tokenizer": ">=2.7"
}
}
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index 4f99dbbb..9cd1c147 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -1025,6 +1025,7 @@ logs:
controller := gomock.NewController(t)
mockLogger := mocks.NewMockLogger(controller)
+ mockLogger.EXPECT().Debug(gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Debug("", "remote", gomock.Any(), "ts", gomock.Any(), "resp.status", gomock.Any(), "method", gomock.Any(), "uri", gomock.Any()).MinTimes(1)
diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go
index 9007541b..2e246480 100644
--- a/tests/plugins/reload/reload_plugin_test.go
+++ b/tests/plugins/reload/reload_plugin_test.go
@@ -358,7 +358,7 @@ func reloadFilteredExt(t *testing.T) {
}
// Should be events only about creating files with txt ext
-func TestReloadCopy500(t *testing.T) {
+func TestReloadCopy100(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
@@ -447,16 +447,16 @@ func TestReloadCopy500(t *testing.T) {
// Scenario
// 1
- // Create 3k files with txt, abc, def extensions
+ // Create 100 files with txt, abc, def extensions
// Copy files to the unit_tests_copy dir
// 2
// Delete both dirs, recreate
- // Create 3k files with txt, abc, def extensions
+ // Create 100 files with txt, abc, def extensions
// Move files to the unit_tests_copy dir
// 3
// Recursive
- t.Run("ReloadMake300Files", reloadMake300Files)
+ t.Run("ReloadMake100Files", reloadMake100Files)
t.Run("ReloadCopyFiles", reloadCopyFiles)
t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive)
t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs)
@@ -478,9 +478,9 @@ func reloadMoveSupport(t *testing.T) {
// rand sleep
rSleep := rand.Int63n(500) // nolint:gosec
time.Sleep(time.Millisecond * time.Duration(rSleep))
- rNum := rand.Int63n(int64(100)) // nolint:gosec
- rDir := rand.Int63n(9) // nolint:gosec
- rExt := rand.Int63n(3) // nolint:gosec
+ rNum := rand.Int63n(int64(33)) // nolint:gosec
+ rDir := rand.Int63n(9) // nolint:gosec
+ rExt := rand.Int63n(3) // nolint:gosec
ext := []string{
".txt",
@@ -570,7 +570,7 @@ func randomChangesInRecursiveDirs(t *testing.T) {
}
for i := 0; i < 10; i++ {
// rand sleep
- rSleep := rand.Int63n(500) // nolint:gosec
+ rSleep := rand.Int63n(100) // nolint:gosec
time.Sleep(time.Millisecond * time.Duration(rSleep))
rNum := rand.Int63n(int64(100)) // nolint:gosec
rDir := rand.Int63n(10) // nolint:gosec
@@ -616,13 +616,13 @@ func reloadCopyFiles(t *testing.T) {
assert.NoError(t, os.Mkdir(testCopyToDir, 0755))
// recreate files
- for i := uint(0); i < 100; i++ {
+ for i := uint(0); i < 33; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt"))
}
- for i := uint(0); i < 100; i++ {
+ for i := uint(0); i < 33; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc"))
}
- for i := uint(0); i < 100; i++ {
+ for i := uint(0); i < 34; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def"))
}
@@ -630,14 +630,14 @@ func reloadCopyFiles(t *testing.T) {
assert.NoError(t, err)
}
-func reloadMake300Files(t *testing.T) {
- for i := uint(0); i < 100; i++ {
+func reloadMake100Files(t *testing.T) {
+ for i := uint(0); i < 33; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt"))
}
- for i := uint(0); i < 100; i++ {
+ for i := uint(0); i < 33; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc"))
}
- for i := uint(0); i < 100; i++ {
+ for i := uint(0); i < 34; i++ {
assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def"))
}
}