summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
committerValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
commitfedf012e632a31d2d0837c22832c7683547ad379 (patch)
treebcb5634dfacccc6d34e49aa7337ac8d1f18b693c /pkg/worker
parent609e61426b137834ac589c88f1124574f939fa67 (diff)
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/sync_worker.go43
-rwxr-xr-xpkg/worker/sync_worker_test.go5
3 files changed, 24 insertions, 28 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index d2cfe2cd..ed8704bb 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -68,7 +68,7 @@ type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 380bfff7..d20b7ae0 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -36,14 +36,14 @@ func From(process *Process) *SyncWorkerImpl {
}
// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.process.State().Value() != StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
@@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
// supervisor may set state of the worker during the work
@@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
}
type wexec struct {
- payload payload.Payload
+ payload *payload.Payload
err error
}
// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Str("payload can not be empty")),
+ err: errors.E(op, errors.Str("payload can not be empty")),
}
return
}
if tw.process.State().Value() != StateReady {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
@@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
tw.process.State().RegisterExec()
}
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, err),
+ err: errors.E(op, err),
}
return
}
@@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
if err != nil {
// append timeout error
err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return payload.Payload{}, multierr.Append(err, ctx.Err())
+ return nil, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
- return payload.Payload{}, res.err
+ return nil, res.err
}
return res.payload, nil
}
}
-func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
// get a frame
@@ -182,7 +179,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err := tw.Relay().Send(fr)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
frameR := tw.getFrame()
@@ -190,28 +187,28 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err = tw.process.Relay().Receive(frameR)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&frame.ERROR != byte(0) {
- return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pld := payload.Payload{
+ pld := &payload.Payload{
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index df556e93..64580f9f 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) {
sw := From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "Process is not ready (inactive)")
}