summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 6a945cf4..8314c039 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -27,7 +27,7 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
// Exec payload without TTL timeout.
func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("sync worker Exec")
+ const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
@@ -63,7 +63,7 @@ type wexec struct {
// Exec payload without TTL timeout.
func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithTimeout")
+ const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
@@ -111,12 +111,15 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}()
select {
+ // exec TTL reached
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
+ // append timeout error
+ err = multierr.Append(err, errors.E(op, errors.ExecTTL))
return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, ctx.Err()
+ return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
return payload.Payload{}, res.err
@@ -126,7 +129,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec pl")
+ const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
fr.WriteVersion(frame.VERSION_1)