summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-xsync_worker.go95
1 files changed, 83 insertions, 12 deletions
diff --git a/sync_worker.go b/sync_worker.go
index d7c15e88..d933077b 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,9 +5,10 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
+ "go.uber.org/multierr"
- "github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -19,6 +20,8 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithContext(ctx context.Context, p Payload) (Payload, error)
}
type syncWorker struct {
@@ -47,7 +50,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(JobError); !ok {
+ if _, ok := err.(ExecError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -60,14 +63,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
return rsp, nil
}
-func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
+type wexec struct {
+ payload Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) {
+ const op = errors.Op("exec_with_context")
+ c := make(chan wexec, 1)
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.w.State().Value() != StateReady {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ if _, ok := err.(ExecError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ return EmptyPayload, multierr.Append(err, ctx.Err())
+ }
+ return EmptyPayload, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+ return res.payload, nil
+ }
+}
+
+func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
+ const op = errors.Op("exec_payload")
// two things; todo: merge
- if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
- return EmptyPayload, errors.Wrap(err, "header error")
+ if err := sendControl(tw.w.Relay(), p.Context); err != nil {
+ return EmptyPayload, errors.E(op, err, "header error")
}
- if err := tw.w.Relay().Send(rqs.Body, 0); err != nil {
- return EmptyPayload, errors.Wrap(err, "sender error")
+ if err := tw.w.Relay().Send(p.Body, 0); err != nil {
+ return EmptyPayload, errors.E(op, err, "sender error")
}
var pr goridge.Prefix
@@ -75,7 +146,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
var err error
if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
if !pr.HasFlag(goridge.PayloadControl) {
@@ -83,12 +154,12 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, JobError(rsp.Context)
+ return EmptyPayload, ExecError(rsp.Context)
}
// add streaming support :)
if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
return rsp, nil
@@ -126,8 +197,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *syncWorker) Kill(ctx context.Context) error {
- return tw.w.Kill(ctx)
+func (tw *syncWorker) Kill() error {
+ return tw.w.Kill()
}
func (tw *syncWorker) Relay() goridge.Relay {