diff options
author | Valery Piashchynski <[email protected]> | 2021-06-13 16:42:36 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-13 16:42:36 +0300 |
commit | 9fd01b07ecae6e68dd0ed8be6be5f8c0228771cb (patch) | |
tree | b170b08ec0460b1dd815875236bf1890af9f7939 /pkg/worker | |
parent | 54c3553cc39df4eae92d1f2c8c428e625f32f41a (diff) |
- Optimize sync_worker payload exec
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker')
-rwxr-xr-x | pkg/worker/state.go | 2 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 64 |
2 files changed, 54 insertions, 12 deletions
diff --git a/pkg/worker/state.go b/pkg/worker/state.go index 9c4543c8..bf152e8b 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -44,7 +44,7 @@ type StateImpl struct { lastUsed uint64 } -// Thread safe +// NewWorkerState initializes a state for the sync.Worker func NewWorkerState(value int64) *StateImpl { return &StateImpl{value: value} } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index c41b5b52..13e70f49 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -3,6 +3,7 @@ package worker import ( "bytes" "context" + "sync" "time" "github.com/spiral/errors" @@ -17,12 +18,20 @@ type Allocator func() (SyncWorker, error) type SyncWorkerImpl struct { process *Process + fPool sync.Pool + bPool sync.Pool } // From creates SyncWorker from BaseProcess func From(process *Process) SyncWorker { return &SyncWorkerImpl{ process: process, + fPool: sync.Pool{New: func() interface{} { + return frame.NewFrame() + }}, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, } } @@ -62,7 +71,7 @@ type wexec struct { err error } -// Exec payload without TTL timeout. +// ExecWithTTL executes payload without TTL timeout. func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -132,11 +141,16 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") - fr := frame.NewFrame() - fr.WriteVersion(frame.VERSION_1) + // get a frame + fr := tw.getFrame() + defer tw.putFrame(fr) + // can be 0 here + fr.WriteVersion(frame.VERSION_1) + + // obtain a buffer + buf := tw.get() - buf := new(bytes.Buffer) buf.Write(p.Context) buf.Write(p.Body) @@ -147,15 +161,16 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error fr.WriteCRC() - // empty and free the buffer - buf.Truncate(0) + // return buffer + tw.put(buf) err := tw.Relay().Send(fr) if err != nil { return payload.Payload{}, errors.E(op, errors.Network, err) } - frameR := frame.NewFrame() + frameR := tw.getFrame() + defer tw.putFrame(frameR) err = tw.process.Relay().Receive(frameR) if err != nil { @@ -180,11 +195,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } - pl := payload.Payload{} - pl.Context = frameR.Payload()[:options[0]] - pl.Body = frameR.Payload()[options[0]:] + pld := payload.Payload{ + Body: make([]byte, len(frameR.Payload()[options[0]:])), + Context: make([]byte, len(frameR.Payload()[:options[0]])), + } + + // by copying we free frame's payload slice + // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // https://blog.golang.org/slices-intro#TOC_6. + copy(pld.Body, frameR.Payload()[options[0]:]) + copy(pld.Context, frameR.Payload()[:options[0]]) - return pl, nil + return pld, nil } func (tw *SyncWorkerImpl) String() string { @@ -226,3 +248,23 @@ func (tw *SyncWorkerImpl) Relay() relay.Relay { func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { tw.process.AttachRelay(rl) } + +// Private + +func (tw *SyncWorkerImpl) get() *bytes.Buffer { + return tw.bPool.Get().(*bytes.Buffer) +} + +func (tw *SyncWorkerImpl) put(b *bytes.Buffer) { + b.Reset() + tw.bPool.Put(b) +} + +func (tw *SyncWorkerImpl) getFrame() *frame.Frame { + return tw.fPool.Get().(*frame.Frame) +} + +func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { + f.Reset() + tw.fPool.Put(f) +} |