summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-20 18:28:46 +0300
committerValery Piashchynski <[email protected]>2020-12-20 18:28:46 +0300
commitf4a36c7f684216fb408693a6c494486144df57cf (patch)
treee1b61bf7e74cb63aa45f9ca0284a4cffe8e06b0e /pkg/worker
parentfbd5adde5abae6f7adb7fcdafc226bcd3480d498 (diff)
parenta10d20d20e910ed8fcfbc3bc690aaf17ee338ff3 (diff)
Merge remote-tracking branch 'origin/2.0' into plugin/redis
# Conflicts: # go.sum # pkg/pipe/pipe_factory_test.go # pkg/pool/static_pool.go # plugins/rpc/plugin.go
Diffstat (limited to 'pkg/worker')
-rwxr-xr-xpkg/worker/sync_worker.go75
-rwxr-xr-xpkg/worker/sync_worker_test.go4
-rwxr-xr-xpkg/worker/worker.go12
3 files changed, 46 insertions, 45 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1eb1396e..eacb8a8a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,12 +6,13 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
-
- "github.com/spiral/goridge/v3"
)
type syncWorker struct {
@@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.w.State().Value() != internal.StateReady {
- return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
}
// set last used time
@@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
tw.w.State().Set(internal.StateReady)
@@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
}
type wexec struct {
- payload internal.Payload
+ payload payload.Payload
err error
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("ExecWithContext")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Str("payload can not be empty")),
}
return
@@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
if tw.w.State().Value() != internal.StateReady {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
}
return
@@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
tw.w.State().RegisterExec()
}
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, err),
}
return
@@ -113,22 +114,22 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
- return internal.Payload{}, multierr.Append(err, ctx.Err())
+ return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return internal.Payload{}, ctx.Err()
+ return payload.Payload{}, ctx.Err()
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.payload, nil
}
}
-func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
- const op = errors.Op("exec payload")
+func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("exec pl")
- frame := goridge.NewFrame()
- frame.WriteVersion(goridge.VERSION_1)
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
// can be 0 here
buf := new(bytes.Buffer)
@@ -136,50 +137,50 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error)
buf.Write(p.Body)
// Context offset
- frame.WriteOptions(uint32(len(p.Context)))
- frame.WritePayloadLen(uint32(buf.Len()))
- frame.WritePayload(buf.Bytes())
+ fr.WriteOptions(uint32(len(p.Context)))
+ fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
- frame.WriteCRC()
+ fr.WriteCRC()
// empty and free the buffer
buf.Truncate(0)
- err := tw.Relay().Send(frame)
+ err := tw.Relay().Send(fr)
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
- frameR := goridge.NewFrame()
+ frameR := frame.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
- return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
- if flags&byte(goridge.ERROR) != byte(0) {
- return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ if flags&byte(frame.ERROR) != byte(0) {
+ return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := internal.Payload{}
- payload.Context = frameR.Payload()[:options[0]]
- payload.Body = frameR.Payload()[options[0]:]
+ pl := payload.Payload{}
+ pl.Context = frameR.Payload()[:options[0]]
+ pl.Body = frameR.Payload()[options[0]:]
- return payload, nil
+ return pl, nil
}
func (tw *syncWorker) String() string {
@@ -218,10 +219,10 @@ func (tw *syncWorker) Kill() error {
return tw.w.Kill()
}
-func (tw *syncWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() relay.Relay {
return tw.w.Relay()
}
-func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl relay.Relay) {
tw.w.AttachRelay(rl)
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index e224e105..40988b06 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -4,7 +4,7 @@ import (
"os/exec"
"testing"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/stretchr/testify/assert"
)
@@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 9a2e76b4..e60ab3f4 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,11 +13,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -67,7 +67,7 @@ type Process struct {
mu sync.RWMutex
// communication bus with underlying process.
- relay goridge.Relay
+ relay relay.Relay
// rd in a second part of pipe to read from stderr
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
@@ -83,7 +83,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
w := &Process{
created: time.Now(),
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
@@ -134,13 +134,13 @@ func (w *Process) State() internal.State {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) AttachRelay(rl goridge.Relay) {
+func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) Relay() goridge.Relay {
+func (w *Process) Relay() relay.Relay {
return w.relay
}