From 453eb10b436925ef91b1206e795e581e6293d132 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 10 Jul 2021 13:57:49 +0300 Subject: Return structure instead of interface in places where that possible Signed-off-by: Valery Piashchynski --- pkg/worker/sync_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/worker') diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 84ff5977..38f44461 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -23,7 +23,7 @@ type SyncWorkerImpl struct { } // From creates SyncWorker from BaseProcess -func From(process *Process) SyncWorker { +func From(process *Process) *SyncWorkerImpl { return &SyncWorkerImpl{ process: process, fPool: sync.Pool{New: func() interface{} { -- cgit v1.2.3 From fedf012e632a31d2d0837c22832c7683547ad379 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 14:26:13 +0300 Subject: BC for the Pool, worker interfaces, pass/return payload by pointer Signed-off-by: Valery Piashchynski --- pkg/worker/interface.go | 4 ++-- pkg/worker/sync_worker.go | 43 ++++++++++++++++++++---------------------- pkg/worker/sync_worker_test.go | 5 ++--- 3 files changed, 24 insertions(+), 28 deletions(-) (limited to 'pkg/worker') diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index d2cfe2cd..ed8704bb 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -68,7 +68,7 @@ type SyncWorker interface { // BaseProcess provides basic functionality for the SyncWorker BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // ExecWithTTL used to handle Exec with TTL - ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 380bfff7..d20b7ae0 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -36,14 +36,14 @@ func From(process *Process) *SyncWorkerImpl { } // Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return nil, errors.E(op, errors.Str("payload can not be empty")) } if tw.process.State().Value() != StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time @@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } // supervisor may set state of the worker during the work @@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { } type wexec struct { - payload payload.Payload + payload *payload.Payload err error } // ExecWithTTL executes payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Str("payload can not be empty")), + err: errors.E(op, errors.Str("payload can not be empty")), } return } if tw.process.State().Value() != StateReady { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } @@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p tw.process.State().RegisterExec() } c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, err), + err: errors.E(op, err), } return } @@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p if err != nil { // append timeout error err = multierr.Append(err, errors.E(op, errors.ExecTTL)) - return payload.Payload{}, multierr.Append(err, ctx.Err()) + return nil, multierr.Append(err, ctx.Err()) } - return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) + return nil, errors.E(op, errors.ExecTTL, ctx.Err()) case res := <-c: if res.err != nil { - return payload.Payload{}, res.err + return nil, res.err } return res.payload, nil } } -func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") // get a frame @@ -182,7 +179,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err := tw.Relay().Send(fr) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } frameR := tw.getFrame() @@ -190,28 +187,28 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err = tw.process.Relay().Receive(frameR) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received")) + return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) + return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&frame.ERROR != byte(0) { - return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) + return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) + return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } - pld := payload.Payload{ + pld := &payload.Payload{ Body: make([]byte, len(frameR.Payload()[options[0]:])), Context: make([]byte, len(frameR.Payload()[:options[0]])), } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index df556e93..64580f9f 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) { sw := From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "Process is not ready (inactive)") } -- cgit v1.2.3 From fba3d927b62f8963f0c291da2739061e726df32e Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 9 Aug 2021 17:10:07 +0300 Subject: Update goridge to v3.2.0, update all frames operations. Signed-off-by: Valery Piashchynski --- pkg/worker/sync_worker.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'pkg/worker') diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index d20b7ae0..74e29b71 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -159,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err defer tw.putFrame(fr) // can be 0 here - fr.WriteVersion(frame.VERSION_1) + fr.WriteVersion(fr.Header(), frame.VERSION_1) // obtain a buffer buf := tw.get() @@ -168,11 +168,11 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err buf.Write(p.Body) // Context offset - fr.WriteOptions(uint32(len(p.Context))) - fr.WritePayloadLen(uint32(buf.Len())) + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) fr.WritePayload(buf.Bytes()) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) // return buffer tw.put(buf) @@ -193,7 +193,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } - if !frameR.VerifyCRC() { + if !frameR.VerifyCRC(frameR.Header()) { return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } @@ -203,7 +203,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } - options := frameR.ReadOptions() + options := frameR.ReadOptions(frameR.Header()) if len(options) != 1 { return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } @@ -214,7 +214,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err } // by copying we free frame's payload slice - // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) // https://blog.golang.org/slices-intro#TOC_6. copy(pld.Body, frameR.Payload()[options[0]:]) copy(pld.Context, frameR.Payload()[:options[0]]) -- cgit v1.2.3