summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go41
1 files changed, 21 insertions, 20 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1eb1396e..daa07186 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
"github.com/spiral/goridge/v3"
@@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.w.State().Value() != internal.StateReady {
- return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
}
// set last used time
@@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
tw.w.State().Set(internal.StateReady)
@@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
}
type wexec struct {
- payload internal.Payload
+ payload payload.Payload
err error
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("ExecWithContext")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Str("payload can not be empty")),
}
return
@@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
if tw.w.State().Value() != internal.StateReady {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
}
return
@@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
tw.w.State().RegisterExec()
}
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, err),
}
return
@@ -113,18 +114,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
- return internal.Payload{}, multierr.Append(err, ctx.Err())
+ return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return internal.Payload{}, ctx.Err()
+ return payload.Payload{}, ctx.Err()
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.payload, nil
}
}
-func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec payload")
frame := goridge.NewFrame()
@@ -147,35 +148,35 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error)
err := tw.Relay().Send(frame)
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
frameR := goridge.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil frame received"))
}
if !frameR.VerifyCRC() {
- return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&byte(goridge.ERROR) != byte(0) {
- return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := internal.Payload{}
+ payload := payload.Payload{}
payload.Context = frameR.Payload()[:options[0]]
payload.Body = frameR.Payload()[options[0]:]