summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-xsync_worker.go84
1 files changed, 77 insertions, 7 deletions
diff --git a/sync_worker.go b/sync_worker.go
index d7c15e88..31d68168 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,9 +5,10 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
+ "go.uber.org/multierr"
- "github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -19,6 +20,7 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, p Payload) (Payload, error)
}
type syncWorker struct {
@@ -60,14 +62,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
return rsp, nil
}
+type wexec struct {
+ payload Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) {
+ const op = errors.Op("exec_with_context")
+ c := make(chan wexec, 1)
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.w.State().Value() != StateReady {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ if _, ok := err.(JobError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ return EmptyPayload, multierr.Append(err, ctx.Err())
+ }
+ return EmptyPayload, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+ return res.payload, nil
+ }
+}
+
func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
+ const op = errors.Op("exec_payload")
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
- return EmptyPayload, errors.Wrap(err, "header error")
+ return EmptyPayload, errors.E(op, err, "header error")
}
if err := tw.w.Relay().Send(rqs.Body, 0); err != nil {
- return EmptyPayload, errors.Wrap(err, "sender error")
+ return EmptyPayload, errors.E(op, err, "sender error")
}
var pr goridge.Prefix
@@ -75,7 +145,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
var err error
if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
if !pr.HasFlag(goridge.PayloadControl) {
@@ -88,7 +158,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// add streaming support :)
if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
return rsp, nil
@@ -126,8 +196,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *syncWorker) Kill(ctx context.Context) error {
- return tw.w.Kill(ctx)
+func (tw *syncWorker) Kill() error {
+ return tw.w.Kill()
}
func (tw *syncWorker) Relay() goridge.Relay {