summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-14 13:20:27 +0300
committerGitHub <[email protected]>2021-06-14 13:20:27 +0300
commitdc8ed203c247afd684f198ebbac103a10bfad72a (patch)
treea18f26274ffce32a0b29c479a3692dc1fe822415
parent54c3553cc39df4eae92d1f2c8c428e625f32f41a (diff)
parent128d71cad43ffcaab60cb60939584df0941f37be (diff)
#722 feat(opt): `inuse_space` and `runtime.newobject` optimizations
#722 feat(opt): `inuse_space` and `runtime.newobject` optimizations
-rwxr-xr-xinternal/protocol.go34
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/transport/socket/socket_factory.go2
-rwxr-xr-xpkg/worker/state.go2
-rwxr-xr-xpkg/worker/sync_worker.go64
-rw-r--r--pkg/worker_watcher/container/vec.go4
6 files changed, 85 insertions, 25 deletions
diff --git a/internal/protocol.go b/internal/protocol.go
index a344a3c4..7487b4f3 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -2,6 +2,7 @@ package internal
import (
"os"
+ "sync"
j "github.com/json-iterator/go"
"github.com/spiral/errors"
@@ -19,9 +20,25 @@ type pidCommand struct {
Pid int `json:"pid"`
}
+var fPool = sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+}}
+
+func getFrame() *frame.Frame {
+ return fPool.Get().(*frame.Frame)
+}
+
+func putFrame(f *frame.Frame) {
+ f.Reset()
+ fPool.Put(f)
+}
+
func SendControl(rl relay.Relay, payload interface{}) error {
const op = errors.Op("send_control")
- fr := frame.NewFrame()
+
+ fr := getFrame()
+ defer putFrame(fr)
+
fr.WriteVersion(frame.VERSION_1)
fr.WriteFlags(frame.CONTROL)
@@ -51,6 +68,8 @@ func SendControl(rl relay.Relay, payload interface{}) error {
fr.WritePayload(data)
fr.WriteCRC()
+ // hold a pointer to a frame
+ // Do we need a copy here????
err = rl.Send(fr)
if err != nil {
return errors.E(op, err)
@@ -66,27 +85,28 @@ func FetchPID(rl relay.Relay) (int64, error) {
return 0, errors.E(op, err)
}
- frameR := frame.NewFrame()
+ fr := getFrame()
+ defer putFrame(fr)
- err = rl.Receive(frameR)
- if !frameR.VerifyCRC() {
+ err = rl.Receive(fr)
+ if !fr.VerifyCRC() {
return 0, errors.E(op, errors.Str("CRC mismatch"))
}
if err != nil {
return 0, errors.E(op, err)
}
- if frameR == nil {
+ if fr == nil {
return 0, errors.E(op, errors.Str("nil frame received"))
}
- flags := frameR.ReadFlags()
+ flags := fr.ReadFlags()
if flags&frame.CONTROL == 0 {
return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
}
link := &pidCommand{}
- err = json.Unmarshal(frameR.Payload(), link)
+ err = json.Unmarshal(fr.Payload(), link)
if err != nil {
return 0, errors.E(op, err)
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6667bfea..6f875072 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -588,7 +588,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
}
}
-//
+// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
@@ -619,7 +619,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Wait()
}
-//
+// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index ee63c9c3..965a0f30 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -81,7 +81,7 @@ type socketSpawn struct {
err error
}
-// SpawnWorker creates Process and connects it to appropriate relay or returns error
+// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 9c4543c8..bf152e8b 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -44,7 +44,7 @@ type StateImpl struct {
lastUsed uint64
}
-// Thread safe
+// NewWorkerState initializes a state for the sync.Worker
func NewWorkerState(value int64) *StateImpl {
return &StateImpl{value: value}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index c41b5b52..13e70f49 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -3,6 +3,7 @@ package worker
import (
"bytes"
"context"
+ "sync"
"time"
"github.com/spiral/errors"
@@ -17,12 +18,20 @@ type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
+ fPool sync.Pool
+ bPool sync.Pool
}
// From creates SyncWorker from BaseProcess
func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
+ fPool: sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+ }},
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
}
}
@@ -62,7 +71,7 @@ type wexec struct {
err error
}
-// Exec payload without TTL timeout.
+// ExecWithTTL executes payload without TTL timeout.
func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -132,11 +141,16 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
- fr := frame.NewFrame()
- fr.WriteVersion(frame.VERSION_1)
+ // get a frame
+ fr := tw.getFrame()
+ defer tw.putFrame(fr)
+
// can be 0 here
+ fr.WriteVersion(frame.VERSION_1)
+
+ // obtain a buffer
+ buf := tw.get()
- buf := new(bytes.Buffer)
buf.Write(p.Context)
buf.Write(p.Body)
@@ -147,15 +161,16 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
fr.WriteCRC()
- // empty and free the buffer
- buf.Truncate(0)
+ // return buffer
+ tw.put(buf)
err := tw.Relay().Send(fr)
if err != nil {
return payload.Payload{}, errors.E(op, errors.Network, err)
}
- frameR := frame.NewFrame()
+ frameR := tw.getFrame()
+ defer tw.putFrame(frameR)
err = tw.process.Relay().Receive(frameR)
if err != nil {
@@ -180,11 +195,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pl := payload.Payload{}
- pl.Context = frameR.Payload()[:options[0]]
- pl.Body = frameR.Payload()[options[0]:]
+ pld := payload.Payload{
+ Body: make([]byte, len(frameR.Payload()[options[0]:])),
+ Context: make([]byte, len(frameR.Payload()[:options[0]])),
+ }
+
+ // by copying we free frame's payload slice
+ // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // https://blog.golang.org/slices-intro#TOC_6.
+ copy(pld.Body, frameR.Payload()[options[0]:])
+ copy(pld.Context, frameR.Payload()[:options[0]])
- return pl, nil
+ return pld, nil
}
func (tw *SyncWorkerImpl) String() string {
@@ -226,3 +248,23 @@ func (tw *SyncWorkerImpl) Relay() relay.Relay {
func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
tw.process.AttachRelay(rl)
}
+
+// Private
+
+func (tw *SyncWorkerImpl) get() *bytes.Buffer {
+ return tw.bPool.Get().(*bytes.Buffer)
+}
+
+func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
+ b.Reset()
+ tw.bPool.Put(b)
+}
+
+func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
+ return tw.fPool.Get().(*frame.Frame)
+}
+
+func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
+ f.Reset()
+ tw.fPool.Put(f)
+}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 239b01c7..565b1b69 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -35,9 +35,7 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
return nil, true
}
- w := <-v.workers
-
- return w, false
+ return <-v.workers, false
}
func (v *Vec) Destroy() {