diff options
-rw-r--r-- | .github/workflows/build.yml | 2 | ||||
-rwxr-xr-x | Makefile | 1 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 24 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 12 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 25 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 11 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 1 | ||||
-rw-r--r-- | plugins/redis/tests/redis_plugin_tests.go | 1 |
8 files changed, 42 insertions, 35 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 04545212..d9cf2687 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -65,6 +65,7 @@ jobs: - name: Run golang tests on Windows without codecov if: ${{ matrix.os == 'windows-latest' }} run: | + go test -v -race -cover -tags=debug ./util go test -v -race -cover -tags=debug ./pkg/pipe go test -v -race -cover -tags=debug ./pkg/pool go test -v -race -cover -tags=debug ./pkg/socket @@ -90,6 +91,7 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/util.txt -covermode=atomic ./util go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket @@ -24,6 +24,7 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests + go test -v -race -cover -tags=debug -covermode=atomic ./util go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 99212ff8..7045b785 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -11,7 +11,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -146,7 +146,7 @@ func Test_Pipe_Echo(t *testing.T) { } }() - sw, err := workerImpl.From(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func Test_Pipe_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := workerImpl.From(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -208,7 +208,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) - sw, err := workerImpl.From(w) + sw, err := worker.From(w) if err != nil { b.Fatal(err) } @@ -249,7 +249,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } }() - sw, err := workerImpl.From(w) + sw, err := worker.From(w) if err != nil { b.Fatal(err) } @@ -276,7 +276,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { } }() - sw, err := workerImpl.From(w) + sw, err := worker.From(w) if err != nil { b.Fatal(err) } @@ -297,7 +297,7 @@ func Test_Echo(t *testing.T) { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -327,7 +327,7 @@ func Test_BadPayload(t *testing.T) { w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -386,7 +386,7 @@ func Test_Echo_Slow(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -420,7 +420,7 @@ func Test_Broken(t *testing.T) { } }) - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -455,7 +455,7 @@ func Test_Error(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } @@ -486,7 +486,7 @@ func Test_NumExecs(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) + syncWorker, err := worker.From(w) if err != nil { t.Fatal(err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 6cc42143..6c177bff 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -79,7 +79,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, p.allocator = newPoolAllocator(factory, cmd) p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) - workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + workers, err := p.allocateSyncWorkers(ctx, p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } @@ -333,11 +333,12 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") var workers []worker.BaseProcess // constant number of stack simplify logic + // TODO do not allocate context on every loop cycle?? for i := int64(0); i < numWorkers; i++ { ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) @@ -345,7 +346,12 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([] cancel() return nil, errors.E(op, errors.WorkerAllocate, err) } - workers = append(workers, w) + sw, err := syncWorker.From(w) + if err != nil { + cancel() + return nil, errors.E(op, err) + } + workers = append(workers, sw) cancel() } return workers, nil diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 35d3264e..9a2e76b4 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -30,13 +30,6 @@ const ( ReadBufSize = 10240 // Kb ) -var syncPool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, -} - // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. @@ -79,6 +72,8 @@ type Process struct { rd io.Reader // stop signal terminates io.Pipe from reading from stderr stop chan struct{} + + syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -93,6 +88,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), + // sync pool for STDERR + // All receivers are pointers + syncPool: sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, + }, } w.rd, w.cmd.Stderr = io.Pipe() @@ -258,15 +261,12 @@ func (w *Process) Kill() error { // put the pointer, to not allocate new slice // but erase it len and then return back func (w *Process) put(data *[]byte) { - *data = (*data)[:0] - *data = (*data)[:cap(*data)] - - syncPool.Put(data) + w.syncPool.Put(data) } // get pointer to the byte slice func (w *Process) get() *[]byte { - return syncPool.Get().(*[]byte) + return w.syncPool.Get().(*[]byte) } // Write appends the contents of pool to the errBuffer, growing the errBuffer as @@ -282,6 +282,7 @@ func (w *Process) watch() { w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message + // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool w.stderr.Write((*buf)[:n]) w.mu.Unlock() w.put(buf) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 8788e509..918145e5 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { @@ -163,16 +162,12 @@ type workerWatcher struct { func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - sw, err := syncWorker.From(workers[i]) - if err != nil { - return err - } - ww.stack.Push(sw) - sw.AddListener(ww.events.Push) + ww.stack.Push(workers[i]) + workers[i].AddListener(ww.events.Push) go func(swc worker.BaseProcess) { ww.wait(swc) - }(sw) + }(workers[i]) } return nil } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go new file mode 100644 index 00000000..65a229e1 --- /dev/null +++ b/plugins/redis/plugin.go @@ -0,0 +1 @@ +package redis diff --git a/plugins/redis/tests/redis_plugin_tests.go b/plugins/redis/tests/redis_plugin_tests.go new file mode 100644 index 00000000..ca8701d2 --- /dev/null +++ b/plugins/redis/tests/redis_plugin_tests.go @@ -0,0 +1 @@ +package tests |