summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go1
-rwxr-xr-xpkg/worker/sync_worker.go14
2 files changed, 8 insertions, 7 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 1cd0a8fa..051e7a8a 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // TODO suspicious logic, redesign
err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index d20b7ae0..74e29b71 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -159,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
defer tw.putFrame(fr)
// can be 0 here
- fr.WriteVersion(frame.VERSION_1)
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
// obtain a buffer
buf := tw.get()
@@ -168,11 +168,11 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
buf.Write(p.Body)
// Context offset
- fr.WriteOptions(uint32(len(p.Context)))
- fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
+ fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WritePayload(buf.Bytes())
- fr.WriteCRC()
+ fr.WriteCRC(fr.Header())
// return buffer
tw.put(buf)
@@ -193,7 +193,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
- if !frameR.VerifyCRC() {
+ if !frameR.VerifyCRC(frameR.Header()) {
return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
@@ -203,7 +203,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
- options := frameR.ReadOptions()
+ options := frameR.ReadOptions(frameR.Header())
if len(options) != 1 {
return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
@@ -214,7 +214,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
}
// by copying we free frame's payload slice
- // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
// https://blog.golang.org/slices-intro#TOC_6.
copy(pld.Body, frameR.Payload()[options[0]:])
copy(pld.Context, frameR.Payload()[:options[0]])