summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/transport/socket/socket_factory.go2
-rwxr-xr-xpkg/worker/state.go2
-rwxr-xr-xpkg/worker/sync_worker.go64
-rw-r--r--pkg/worker_watcher/container/vec.go4
5 files changed, 58 insertions, 18 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6667bfea..6f875072 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -588,7 +588,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
}
}
-//
+// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
@@ -619,7 +619,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Wait()
}
-//
+// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index ee63c9c3..965a0f30 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -81,7 +81,7 @@ type socketSpawn struct {
err error
}
-// SpawnWorker creates Process and connects it to appropriate relay or returns error
+// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 9c4543c8..bf152e8b 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -44,7 +44,7 @@ type StateImpl struct {
lastUsed uint64
}
-// Thread safe
+// NewWorkerState initializes a state for the sync.Worker
func NewWorkerState(value int64) *StateImpl {
return &StateImpl{value: value}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index c41b5b52..13e70f49 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -3,6 +3,7 @@ package worker
import (
"bytes"
"context"
+ "sync"
"time"
"github.com/spiral/errors"
@@ -17,12 +18,20 @@ type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
+ fPool sync.Pool
+ bPool sync.Pool
}
// From creates SyncWorker from BaseProcess
func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
+ fPool: sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+ }},
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
}
}
@@ -62,7 +71,7 @@ type wexec struct {
err error
}
-// Exec payload without TTL timeout.
+// ExecWithTTL executes payload without TTL timeout.
func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -132,11 +141,16 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
- fr := frame.NewFrame()
- fr.WriteVersion(frame.VERSION_1)
+ // get a frame
+ fr := tw.getFrame()
+ defer tw.putFrame(fr)
+
// can be 0 here
+ fr.WriteVersion(frame.VERSION_1)
+
+ // obtain a buffer
+ buf := tw.get()
- buf := new(bytes.Buffer)
buf.Write(p.Context)
buf.Write(p.Body)
@@ -147,15 +161,16 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
fr.WriteCRC()
- // empty and free the buffer
- buf.Truncate(0)
+ // return buffer
+ tw.put(buf)
err := tw.Relay().Send(fr)
if err != nil {
return payload.Payload{}, errors.E(op, errors.Network, err)
}
- frameR := frame.NewFrame()
+ frameR := tw.getFrame()
+ defer tw.putFrame(frameR)
err = tw.process.Relay().Receive(frameR)
if err != nil {
@@ -180,11 +195,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pl := payload.Payload{}
- pl.Context = frameR.Payload()[:options[0]]
- pl.Body = frameR.Payload()[options[0]:]
+ pld := payload.Payload{
+ Body: make([]byte, len(frameR.Payload()[options[0]:])),
+ Context: make([]byte, len(frameR.Payload()[:options[0]])),
+ }
+
+ // by copying we free frame's payload slice
+ // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // https://blog.golang.org/slices-intro#TOC_6.
+ copy(pld.Body, frameR.Payload()[options[0]:])
+ copy(pld.Context, frameR.Payload()[:options[0]])
- return pl, nil
+ return pld, nil
}
func (tw *SyncWorkerImpl) String() string {
@@ -226,3 +248,23 @@ func (tw *SyncWorkerImpl) Relay() relay.Relay {
func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
tw.process.AttachRelay(rl)
}
+
+// Private
+
+func (tw *SyncWorkerImpl) get() *bytes.Buffer {
+ return tw.bPool.Get().(*bytes.Buffer)
+}
+
+func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
+ b.Reset()
+ tw.bPool.Put(b)
+}
+
+func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
+ return tw.fPool.Get().(*frame.Frame)
+}
+
+func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
+ f.Reset()
+ tw.fPool.Put(f)
+}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 239b01c7..565b1b69 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -35,9 +35,7 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
return nil, true
}
- w := <-v.workers
-
- return w, false
+ return <-v.workers, false
}
func (v *Vec) Destroy() {