summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go64
1 files changed, 53 insertions, 11 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index c41b5b52..13e70f49 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -3,6 +3,7 @@ package worker
import (
"bytes"
"context"
+ "sync"
"time"
"github.com/spiral/errors"
@@ -17,12 +18,20 @@ type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
+ fPool sync.Pool
+ bPool sync.Pool
}
// From creates SyncWorker from BaseProcess
func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
+ fPool: sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+ }},
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
}
}
@@ -62,7 +71,7 @@ type wexec struct {
err error
}
-// Exec payload without TTL timeout.
+// ExecWithTTL executes payload without TTL timeout.
func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -132,11 +141,16 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
- fr := frame.NewFrame()
- fr.WriteVersion(frame.VERSION_1)
+ // get a frame
+ fr := tw.getFrame()
+ defer tw.putFrame(fr)
+
// can be 0 here
+ fr.WriteVersion(frame.VERSION_1)
+
+ // obtain a buffer
+ buf := tw.get()
- buf := new(bytes.Buffer)
buf.Write(p.Context)
buf.Write(p.Body)
@@ -147,15 +161,16 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
fr.WriteCRC()
- // empty and free the buffer
- buf.Truncate(0)
+ // return buffer
+ tw.put(buf)
err := tw.Relay().Send(fr)
if err != nil {
return payload.Payload{}, errors.E(op, errors.Network, err)
}
- frameR := frame.NewFrame()
+ frameR := tw.getFrame()
+ defer tw.putFrame(frameR)
err = tw.process.Relay().Receive(frameR)
if err != nil {
@@ -180,11 +195,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pl := payload.Payload{}
- pl.Context = frameR.Payload()[:options[0]]
- pl.Body = frameR.Payload()[options[0]:]
+ pld := payload.Payload{
+ Body: make([]byte, len(frameR.Payload()[options[0]:])),
+ Context: make([]byte, len(frameR.Payload()[:options[0]])),
+ }
+
+ // by copying we free frame's payload slice
+ // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // https://blog.golang.org/slices-intro#TOC_6.
+ copy(pld.Body, frameR.Payload()[options[0]:])
+ copy(pld.Context, frameR.Payload()[:options[0]])
- return pl, nil
+ return pld, nil
}
func (tw *SyncWorkerImpl) String() string {
@@ -226,3 +248,23 @@ func (tw *SyncWorkerImpl) Relay() relay.Relay {
func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
tw.process.AttachRelay(rl)
}
+
+// Private
+
+func (tw *SyncWorkerImpl) get() *bytes.Buffer {
+ return tw.bPool.Get().(*bytes.Buffer)
+}
+
+func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
+ b.Reset()
+ tw.bPool.Put(b)
+}
+
+func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
+ return tw.fPool.Get().(*frame.Frame)
+}
+
+func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
+ f.Reset()
+ tw.fPool.Put(f)
+}