diff options
Diffstat (limited to 'pkg/worker')
-rwxr-xr-x | pkg/worker/sync_worker.go | 41 | ||||
-rwxr-xr-x | pkg/worker/sync_worker_test.go | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 4 |
3 files changed, 25 insertions, 24 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1eb1396e..daa07186 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" "github.com/spiral/goridge/v3" @@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) { } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } if tw.w.State().Value() != internal.StateReady { - return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) } // set last used time @@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } - return internal.Payload{}, err + return payload.Payload{}, err } tw.w.State().Set(internal.StateReady) @@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { } type wexec struct { - payload internal.Payload + payload payload.Payload err error } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("ExecWithContext") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Str("payload can not be empty")), } return @@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( if tw.w.State().Value() != internal.StateReady { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), } return @@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( tw.w.State().RegisterExec() } c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, err), } return @@ -113,18 +114,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { - return internal.Payload{}, multierr.Append(err, ctx.Err()) + return payload.Payload{}, multierr.Append(err, ctx.Err()) } - return internal.Payload{}, ctx.Err() + return payload.Payload{}, ctx.Err() case res := <-c: if res.err != nil { - return internal.Payload{}, res.err + return payload.Payload{}, res.err } return res.payload, nil } } -func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("exec payload") frame := goridge.NewFrame() @@ -147,35 +148,35 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) err := tw.Relay().Send(frame) if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } frameR := goridge.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return internal.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil frame received")) } if !frameR.VerifyCRC() { - return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&byte(goridge.ERROR) != byte(0) { - return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := internal.Payload{} + payload := payload.Payload{} payload.Context = frameR.Payload()[:options[0]] payload.Body = frameR.Payload()[options[0]:] diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index e224e105..40988b06 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -4,7 +4,7 @@ import ( "os/exec" "testing" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/stretchr/testify/assert" ) @@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 35d3264e..95fa6e06 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -17,7 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - events2 "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -88,7 +88,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } w := &Process{ created: time.Now(), - events: events2.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), |