diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 1 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 14 |
2 files changed, 8 insertions, 7 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 1cd0a8fa..051e7a8a 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // TODO suspicious logic, redesign err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index d20b7ae0..74e29b71 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -159,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err defer tw.putFrame(fr) // can be 0 here - fr.WriteVersion(frame.VERSION_1) + fr.WriteVersion(fr.Header(), frame.VERSION_1) // obtain a buffer buf := tw.get() @@ -168,11 +168,11 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err buf.Write(p.Body) // Context offset - fr.WriteOptions(uint32(len(p.Context))) - fr.WritePayloadLen(uint32(buf.Len())) + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) fr.WritePayload(buf.Bytes()) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) // return buffer tw.put(buf) @@ -193,7 +193,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } - if !frameR.VerifyCRC() { + if !frameR.VerifyCRC(frameR.Header()) { return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } @@ -203,7 +203,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } - options := frameR.ReadOptions() + options := frameR.ReadOptions(frameR.Header()) if len(options) != 1 { return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } @@ -214,7 +214,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err } // by copying we free frame's payload slice - // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) // https://blog.golang.org/slices-intro#TOC_6. copy(pld.Body, frameR.Payload()[options[0]:]) copy(pld.Context, frameR.Payload()[:options[0]]) |