diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool_test.go | 4 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 2 | ||||
-rwxr-xr-x | pkg/worker/state.go | 2 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 64 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 4 |
5 files changed, 58 insertions, 18 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6667bfea..6f875072 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -588,7 +588,7 @@ func Benchmark_Pool_Echo(b *testing.B) { } } -// +// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() p, err := Initialize( @@ -619,7 +619,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Wait() } -// +// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx := context.Background() p, err := Initialize( diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index ee63c9c3..965a0f30 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -81,7 +81,7 @@ type socketSpawn struct { err error } -// SpawnWorker creates Process and connects it to appropriate relay or returns error +// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) diff --git a/pkg/worker/state.go b/pkg/worker/state.go index 9c4543c8..bf152e8b 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -44,7 +44,7 @@ type StateImpl struct { lastUsed uint64 } -// Thread safe +// NewWorkerState initializes a state for the sync.Worker func NewWorkerState(value int64) *StateImpl { return &StateImpl{value: value} } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index c41b5b52..13e70f49 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -3,6 +3,7 @@ package worker import ( "bytes" "context" + "sync" "time" "github.com/spiral/errors" @@ -17,12 +18,20 @@ type Allocator func() (SyncWorker, error) type SyncWorkerImpl struct { process *Process + fPool sync.Pool + bPool sync.Pool } // From creates SyncWorker from BaseProcess func From(process *Process) SyncWorker { return &SyncWorkerImpl{ process: process, + fPool: sync.Pool{New: func() interface{} { + return frame.NewFrame() + }}, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, } } @@ -62,7 +71,7 @@ type wexec struct { err error } -// Exec payload without TTL timeout. +// ExecWithTTL executes payload without TTL timeout. func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -132,11 +141,16 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") - fr := frame.NewFrame() - fr.WriteVersion(frame.VERSION_1) + // get a frame + fr := tw.getFrame() + defer tw.putFrame(fr) + // can be 0 here + fr.WriteVersion(frame.VERSION_1) + + // obtain a buffer + buf := tw.get() - buf := new(bytes.Buffer) buf.Write(p.Context) buf.Write(p.Body) @@ -147,15 +161,16 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error fr.WriteCRC() - // empty and free the buffer - buf.Truncate(0) + // return buffer + tw.put(buf) err := tw.Relay().Send(fr) if err != nil { return payload.Payload{}, errors.E(op, errors.Network, err) } - frameR := frame.NewFrame() + frameR := tw.getFrame() + defer tw.putFrame(frameR) err = tw.process.Relay().Receive(frameR) if err != nil { @@ -180,11 +195,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } - pl := payload.Payload{} - pl.Context = frameR.Payload()[:options[0]] - pl.Body = frameR.Payload()[options[0]:] + pld := payload.Payload{ + Body: make([]byte, len(frameR.Payload()[options[0]:])), + Context: make([]byte, len(frameR.Payload()[:options[0]])), + } + + // by copying we free frame's payload slice + // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // https://blog.golang.org/slices-intro#TOC_6. + copy(pld.Body, frameR.Payload()[options[0]:]) + copy(pld.Context, frameR.Payload()[:options[0]]) - return pl, nil + return pld, nil } func (tw *SyncWorkerImpl) String() string { @@ -226,3 +248,23 @@ func (tw *SyncWorkerImpl) Relay() relay.Relay { func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { tw.process.AttachRelay(rl) } + +// Private + +func (tw *SyncWorkerImpl) get() *bytes.Buffer { + return tw.bPool.Get().(*bytes.Buffer) +} + +func (tw *SyncWorkerImpl) put(b *bytes.Buffer) { + b.Reset() + tw.bPool.Put(b) +} + +func (tw *SyncWorkerImpl) getFrame() *frame.Frame { + return tw.fPool.Get().(*frame.Frame) +} + +func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { + f.Reset() + tw.fPool.Put(f) +} diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 239b01c7..565b1b69 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -35,9 +35,7 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) { return nil, true } - w := <-v.workers - - return w, false + return <-v.workers, false } func (v *Vec) Destroy() { |