diff options
author | Valery Piashchynski <[email protected]> | 2021-02-04 12:26:16 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-02-04 12:26:16 +0300 |
commit | b9c9909b98c1b3e15421a4bcad9e8fcc01332d37 (patch) | |
tree | b969808a6bdd67bbb566421a2158a51c9de3713e | |
parent | 8a8d9d7c64226397792e8f1aa7cc607ab413906e (diff) | |
parent | a902a06e670d70b0f806899765bdb206977e7698 (diff) |
Merge pull request #526 from spiral/fix/memory_leaksv2.0.0-beta.24
bug(leak): workers memory leak
-rw-r--r-- | .github/workflows/codeql-analysis.yml | 6 | ||||
-rwxr-xr-x | .rr.yaml | 4 | ||||
-rwxr-xr-x | bors.toml | 24 | ||||
-rw-r--r-- | pkg/events/worker_events.go | 5 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 6 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 2 | ||||
-rw-r--r-- | pkg/states/worker_states.go | 2 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 14 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 16 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 62 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 64 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 2 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 134 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 1 | ||||
-rw-r--r-- | plugins/server/plugin.go | 18 | ||||
-rw-r--r-- | tests/composer.json | 5 | ||||
-rw-r--r-- | tests/plugins/http/http_plugin_test.go | 1 | ||||
-rw-r--r-- | tests/plugins/reload/reload_plugin_test.go | 30 |
18 files changed, 218 insertions, 178 deletions
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 75e40110..f630ff40 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -7,17 +7,17 @@ name: "CodeQL" on: push: - branches: [master] + branches: [roadrunner-core, roadrunner-binary] pull_request: # The branches below must be a subset of the branches above - branches: [master] + branches: [roadrunner-core, roadrunner-binary] schedule: - cron: '0 15 * * 6' jobs: analyze: name: Analyze - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: fail-fast: false @@ -61,9 +61,13 @@ http: "output": "output-header" pool: + # default - num of logical CPUs num_workers: 6 + # default 0 max_jobs: 0 + # default 1 minute allocate_timeout: 60s + # default 1 minute destroy_timeout: 60s supervisor: # watch_tick defines how often to check the state of the workers (seconds) @@ -1,20 +1,14 @@ status = [ - 'Build (Go 1.14, PHP 7.4, OS ubuntu-latest)', - 'Build (Go 1.14, PHP 7.4, OS windows-latest)', - 'Build (Go 1.14, PHP 7.4, OS macos-latest)', - 'Build (Go 1.15, PHP 7.4, OS ubuntu-latest)', - 'Build (Go 1.15, PHP 7.4, OS windows-latest)', - 'Build (Go 1.15, PHP 7.4, OS macos-latest)', - 'Build (Go 1.14, PHP 8.0, OS ubuntu-latest)', - 'Build (Go 1.14, PHP 8.0, OS windows-latest)', - 'Build (Go 1.14, PHP 8.0, OS macos-latest)', - 'Build (Go 1.15, PHP 8.0, OS ubuntu-latest)', - 'Build (Go 1.15, PHP 8.0, OS windows-latest)', - 'Build (Go 1.15, PHP 8.0, OS macos-latest)', - 'Golang-CI (lint)', - 'Build docker image', + 'Linux / Build (Go 1.14, PHP 7.4, OS ubuntu-20.04)', + 'Linux / Build (Go 1.15, PHP 7.4, OS ubuntu-20.04)', + 'Linux / Build (Go 1.14, PHP 8.0, OS ubuntu-20.04)', + 'Linux / Build (Go 1.15, PHP 8.0, OS ubuntu-20.04)', + 'macOS / Build (Go 1.14, PHP 7.4, OS macos-latest)', + 'macOS / Build (Go 1.15, PHP 7.4, OS macos-latest)', + 'macOS / Build (Go 1.14, PHP 8.0, OS macos-latest)', + 'macOS / Build (Go 1.15, PHP 8.0, OS macos-latest)', + 'Linux / Golang-CI (lint) ', ] - required_approvals = 0 delete_merged_branches = true timeout-sec = 1800 diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 9d428f7d..11bd6ab7 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -3,9 +3,10 @@ package events const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr ) type W int64 @@ -16,6 +17,8 @@ func (ev W) String() string { return "EventWorkerError" case EventWorkerLog: return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" } return "Unknown event type" } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 2d2b2b7d..4cfd5ec6 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -167,11 +167,11 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 1) + block := make(chan struct{}, 10) listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerLog { + if wev.Event == events.EventWorkerStderr { e := string(wev.Payload.([]byte)) if strings.ContainsAny(e, "undefined_function()") { block <- struct{}{} @@ -491,7 +491,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 1) + block := make(chan struct{}, 10) listener := func(event interface{}) { if ev, ok := event.(events.PoolEvent); ok { diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index c67d5d91..cbe9f5cb 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -210,7 +210,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 1) + block := make(chan struct{}, 10) listener := func(event interface{}) { if ev, ok := event.(events.PoolEvent); ok { if ev.Event == events.EventMaxMemory { diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go index 22fdfe8a..fe653cb4 100644 --- a/pkg/states/worker_states.go +++ b/pkg/states/worker_states.go @@ -16,6 +16,7 @@ const ( // StateStopping - process is being softly stopped. StateStopping + // StateKilling - process is being forcibly stopped StateKilling // State of worker, when no need to allocate new one @@ -27,5 +28,6 @@ const ( // StateErrored - error WorkerState (can't be used). StateErrored + // StateRemove - worker is killed and removed from the stack StateRemove ) diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index e247324c..73008471 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + w, err := NewPipeFactory().SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid2(t *testing.T) { diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index b23af19f..3efeb59c 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 0e29e7d2..1361693b 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -3,11 +3,13 @@ package socket import ( "net" "os/exec" + "strings" "sync" "syscall" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Invalid2(t *testing.T) { @@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo2(t *testing.T) { @@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout2(t *testing.T) { @@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Nil(t, res.Context) assert.Nil(t, res.Body) wg.Wait() + <-finish } func Test_Unix_Echo2(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index f55fc3dd..c13a897b 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -4,10 +4,12 @@ import ( "context" "net" "os/exec" + "strings" "sync" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Timeout(t *testing.T) { @@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo(t *testing.T) { @@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout(t *testing.T) { @@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + block := make(chan struct{}) + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerStderr { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) + <-block wg.Wait() } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 696fbdb7..010af076 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -36,10 +36,8 @@ func FromSync(w *SyncWorkerImpl) BaseProcess { state: w.process.state, cmd: w.process.cmd, pid: w.process.pid, - stderr: w.process.stderr, endState: w.process.endState, relay: w.process.relay, - rd: w.process.rd, } } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2f1f399d..b726c6f1 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -1,14 +1,11 @@ package worker import ( - "bytes" "fmt" - "io" "os" "os/exec" "strconv" "strings" - "sync" "time" "github.com/spiral/errors" @@ -53,27 +50,11 @@ type Process struct { // can be nil while process is not started. pid int - // stderr aggregates stderr output from underlying process. Value can be - // receive only once command is completed and all pipes are closed. - stderr *bytes.Buffer - - // channel is being closed once command is complete. - // waitDone chan interface{} - // contains information about resulted process state. endState *os.ProcessState - // ensures than only one execution can be run at once. - mu sync.RWMutex - // communication bus with underlying process. relay relay.Relay - // rd in a second part of pipe to read from stderr - rd io.Reader - // stop signal terminates io.Pipe from reading from stderr - stop chan struct{} - - syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(states.StateInactive), - stderr: new(bytes.Buffer), - stop: make(chan struct{}, 1), - // sync pool for STDERR - // All receivers are pointers - syncPool: sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, - }, } - w.rd, w.cmd.Stderr = io.Pipe() - - // small buffer optimization - // at this point we know, that stderr will contain huge messages - w.stderr.Grow(ReadBufSize) + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w // add options for i := 0; i < len(options); i++ { options[i](w) } - go func() { - w.watch() - }() - return w, nil } @@ -189,44 +153,36 @@ func (w *Process) Start() error { // to find or Start the script. func (w *Process) Wait() error { const op = errors.Op("process_wait") - err := multierr.Combine(w.cmd.Wait()) + var err error + err = w.cmd.Wait() + // If worker was destroyed, just exit if w.State().Value() == states.StateDestroyed { - return errors.E(op, err) + return nil } - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then process.cmd.Wait return an error - w.endState = w.cmd.ProcessState + // If state is different, and err is not nil, append it to the errors if err != nil { - w.state.Set(states.StateErrored) - - w.mu.RLock() - // if process return code > 0, here will be an error from stderr (if presents) - if w.stderr.Len() > 0 { - err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) - // stop the stderr buffer - w.stop <- struct{}{} - } - w.mu.RUnlock() - - return multierr.Append(err, w.closeRelay()) + w.State().Set(states.StateErrored) + err = multierr.Combine(err, errors.E(op, err)) } - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(states.StateErrored) - return err + // closeRelay + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + err2 := w.closeRelay() + if err2 != nil { + w.State().Set(states.StateErrored) + return multierr.Append(err, errors.E(op, err2)) } - if w.endState.Success() { - w.state.Set(states.StateStopped) + if w.cmd.ProcessState.Success() { + w.State().Set(states.StateStopped) + return nil } - w.stderr.Reset() - - return nil + return err } func (w *Process) closeRelay() error { @@ -272,48 +228,8 @@ func (w *Process) Kill() error { return nil } -// put the pointer, to not allocate new slice -// but erase it len and then return back -func (w *Process) put(data *[]byte) { - w.syncPool.Put(data) -} - -// get pointer to the byte slice -func (w *Process) get() *[]byte { - return w.syncPool.Get().(*[]byte) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; errBuffer is always nil. -func (w *Process) watch() { - go func() { - for { - select { - case <-w.stop: - buf := w.get() - // read the last data - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - return - default: - // read the max 10kb of stderr per one read - buf := w.get() - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // delete all prev messages - w.stderr.Reset() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - } - } - }() +// Worker stderr +func (w *Process) Write(p []byte) (n int, err error) { + w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + return len(p), nil } diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index c036ae96..55a9832b 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -26,7 +26,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error { *list = append(*list, name) } rpc.log.Debug("list of services", "list", *list) - rpc.log.Debug("successfully finished List method") return nil } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 73ce71f7..99d93d19 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -140,7 +140,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En } list := make([]events.Listener, 0, 1) - list = append(list, server.collectPoolLogs) + list = append(list, server.collectEvents) if len(listeners) != 0 { list = append(list, listeners...) } @@ -201,7 +201,7 @@ func (server *Plugin) setEnv(e Env) []string { return env } -func (server *Plugin) collectPoolLogs(event interface{}) { +func (server *Plugin) collectEvents(event interface{}) { if we, ok := event.(events.PoolEvent); ok { switch we.Event { case events.EventMaxMemory: @@ -234,9 +234,12 @@ func (server *Plugin) collectPoolLogs(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t")) + case events.EventWorkerStderr: + // TODO unsafe byte to string convert + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t")) } } } @@ -245,9 +248,12 @@ func (server *Plugin) collectWorkerLogs(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t")) + case events.EventWorkerStderr: + // TODO unsafe byte to string convert + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t")) } } } diff --git a/tests/composer.json b/tests/composer.json index 0cf74581..52fa3a0e 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -1,10 +1,11 @@ { - "minimum-stability": "dev", + "minimum-stability": "beta", "prefer-stable": true, "require": { "nyholm/psr7": "^1.3", "spiral/roadrunner": "^2.0", "spiral/roadrunner-http": "^2.0", - "temporal/sdk": "dev-master" + "temporal/sdk": ">=1.0", + "spiral/tokenizer": ">=2.7" } } diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index 4f99dbbb..9cd1c147 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -1025,6 +1025,7 @@ logs: controller := gomock.NewController(t) mockLogger := mocks.NewMockLogger(controller) + mockLogger.EXPECT().Debug(gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Debug("", "remote", gomock.Any(), "ts", gomock.Any(), "resp.status", gomock.Any(), "method", gomock.Any(), "uri", gomock.Any()).MinTimes(1) diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go index 9007541b..2e246480 100644 --- a/tests/plugins/reload/reload_plugin_test.go +++ b/tests/plugins/reload/reload_plugin_test.go @@ -358,7 +358,7 @@ func reloadFilteredExt(t *testing.T) { } // Should be events only about creating files with txt ext -func TestReloadCopy500(t *testing.T) { +func TestReloadCopy100(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -447,16 +447,16 @@ func TestReloadCopy500(t *testing.T) { // Scenario // 1 - // Create 3k files with txt, abc, def extensions + // Create 100 files with txt, abc, def extensions // Copy files to the unit_tests_copy dir // 2 // Delete both dirs, recreate - // Create 3k files with txt, abc, def extensions + // Create 100 files with txt, abc, def extensions // Move files to the unit_tests_copy dir // 3 // Recursive - t.Run("ReloadMake300Files", reloadMake300Files) + t.Run("ReloadMake100Files", reloadMake100Files) t.Run("ReloadCopyFiles", reloadCopyFiles) t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive) t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs) @@ -478,9 +478,9 @@ func reloadMoveSupport(t *testing.T) { // rand sleep rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(100)) // nolint:gosec - rDir := rand.Int63n(9) // nolint:gosec - rExt := rand.Int63n(3) // nolint:gosec + rNum := rand.Int63n(int64(33)) // nolint:gosec + rDir := rand.Int63n(9) // nolint:gosec + rExt := rand.Int63n(3) // nolint:gosec ext := []string{ ".txt", @@ -570,7 +570,7 @@ func randomChangesInRecursiveDirs(t *testing.T) { } for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(500) // nolint:gosec + rSleep := rand.Int63n(100) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(10) // nolint:gosec @@ -616,13 +616,13 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) // recreate files - for i := uint(0); i < 100; i++ { + for i := uint(0); i < 33; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } - for i := uint(0); i < 100; i++ { + for i := uint(0); i < 33; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc")) } - for i := uint(0); i < 100; i++ { + for i := uint(0); i < 34; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def")) } @@ -630,14 +630,14 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, err) } -func reloadMake300Files(t *testing.T) { - for i := uint(0); i < 100; i++ { +func reloadMake100Files(t *testing.T) { + for i := uint(0); i < 33; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } - for i := uint(0); i < 100; i++ { + for i := uint(0); i < 33; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc")) } - for i := uint(0); i < 100; i++ { + for i := uint(0); i < 34; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def")) } } |