diff options
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-x | pkg/worker/sync_worker.go | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 6a945cf4..8314c039 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -27,7 +27,7 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) { // Exec payload without TTL timeout. func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("sync worker Exec") + const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } @@ -63,7 +63,7 @@ type wexec struct { // Exec payload without TTL timeout. func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { - const op = errors.Op("ExecWithTimeout") + const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) go func() { @@ -111,12 +111,15 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p }() select { + // exec TTL reached case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { + // append timeout error + err = multierr.Append(err, errors.E(op, errors.ExecTTL)) return payload.Payload{}, multierr.Append(err, ctx.Err()) } - return payload.Payload{}, ctx.Err() + return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) case res := <-c: if res.err != nil { return payload.Payload{}, res.err @@ -126,7 +129,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec pl") + const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() fr.WriteVersion(frame.VERSION_1) |