summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 14:38:29 +0300
committerValery Piashchynski <[email protected]>2020-12-17 14:38:29 +0300
commit7884349f27ed750825a0f4dea59af8964e182651 (patch)
tree98c3a819e6058c23090f62b983193cd4984b39d9 /pkg
parentee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (diff)
Redis initial commit
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pipe/pipe_factory_test.go24
-rwxr-xr-xpkg/pool/static_pool.go12
-rwxr-xr-xpkg/worker/worker.go25
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go11
4 files changed, 37 insertions, 35 deletions
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 99212ff8..7045b785 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -11,7 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -146,7 +146,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
}()
- sw, err := workerImpl.From(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -174,7 +174,7 @@ func Test_Pipe_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := workerImpl.From(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -208,7 +208,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
- sw, err := workerImpl.From(w)
+ sw, err := worker.From(w)
if err != nil {
b.Fatal(err)
}
@@ -249,7 +249,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
+ sw, err := worker.From(w)
if err != nil {
b.Fatal(err)
}
@@ -276,7 +276,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
+ sw, err := worker.From(w)
if err != nil {
b.Fatal(err)
}
@@ -297,7 +297,7 @@ func Test_Echo(t *testing.T) {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -327,7 +327,7 @@ func Test_BadPayload(t *testing.T) {
w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -386,7 +386,7 @@ func Test_Echo_Slow(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -420,7 +420,7 @@ func Test_Broken(t *testing.T) {
}
})
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -455,7 +455,7 @@ func Test_Error(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
@@ -486,7 +486,7 @@ func Test_NumExecs(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
+ syncWorker, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 6cc42143..6c177bff 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -79,7 +79,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
p.allocator = newPoolAllocator(factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateSyncWorkers(ctx, p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -333,11 +333,12 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
+ // TODO do not allocate context on every loop cycle??
for i := int64(0); i < numWorkers; i++ {
ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
@@ -345,7 +346,12 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]
cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- workers = append(workers, w)
+ sw, err := syncWorker.From(w)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+ workers = append(workers, sw)
cancel()
}
return workers, nil
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 35d3264e..9a2e76b4 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -30,13 +30,6 @@ const (
ReadBufSize = 10240 // Kb
)
-var syncPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -79,6 +72,8 @@ type Process struct {
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
stop chan struct{}
+
+ syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -93,6 +88,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
}
w.rd, w.cmd.Stderr = io.Pipe()
@@ -258,15 +261,12 @@ func (w *Process) Kill() error {
// put the pointer, to not allocate new slice
// but erase it len and then return back
func (w *Process) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- syncPool.Put(data)
+ w.syncPool.Put(data)
}
// get pointer to the byte slice
func (w *Process) get() *[]byte {
- return syncPool.Get().(*[]byte)
+ return w.syncPool.Get().(*[]byte)
}
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
@@ -282,6 +282,7 @@ func (w *Process) watch() {
w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
w.stderr.Write((*buf)[:n])
w.mu.Unlock()
w.put(buf)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 8788e509..918145e5 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
@@ -163,16 +162,12 @@ type workerWatcher struct {
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- sw, err := syncWorker.From(workers[i])
- if err != nil {
- return err
- }
- ww.stack.Push(sw)
- sw.AddListener(ww.events.Push)
+ ww.stack.Push(workers[i])
+ workers[i].AddListener(ww.events.Push)
go func(swc worker.BaseProcess) {
ww.wait(swc)
- }(sw)
+ }(workers[i])
}
return nil
}