summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/worker/worker.go25
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go11
2 files changed, 16 insertions, 20 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index ae59d611..e60ab3f4 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -30,13 +30,6 @@ const (
ReadBufSize = 10240 // Kb
)
-var syncPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -79,6 +72,8 @@ type Process struct {
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
stop chan struct{}
+
+ syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -93,6 +88,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
}
w.rd, w.cmd.Stderr = io.Pipe()
@@ -258,15 +261,12 @@ func (w *Process) Kill() error {
// put the pointer, to not allocate new slice
// but erase it len and then return back
func (w *Process) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- syncPool.Put(data)
+ w.syncPool.Put(data)
}
// get pointer to the byte slice
func (w *Process) get() *[]byte {
- return syncPool.Get().(*[]byte)
+ return w.syncPool.Get().(*[]byte)
}
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
@@ -282,6 +282,7 @@ func (w *Process) watch() {
w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
w.stderr.Write((*buf)[:n])
w.mu.Unlock()
w.put(buf)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 8788e509..918145e5 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
@@ -163,16 +162,12 @@ type workerWatcher struct {
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- sw, err := syncWorker.From(workers[i])
- if err != nil {
- return err
- }
- ww.stack.Push(sw)
- sw.AddListener(ww.events.Push)
+ ww.stack.Push(workers[i])
+ workers[i].AddListener(ww.events.Push)
go func(swc worker.BaseProcess) {
ww.wait(swc)
- }(sw)
+ }(workers[i])
}
return nil
}