summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-xsync_worker.go116
1 files changed, 26 insertions, 90 deletions
diff --git a/sync_worker.go b/sync_worker.go
index de9491d6..d7c15e88 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,6 +5,8 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -14,90 +16,24 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
+
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
-
- // ExecWithContext allow to set ExecTTL
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
-type taskWorker struct {
+type syncWorker struct {
w WorkerBase
}
func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
- return &taskWorker{
+ return &syncWorker{
w: w,
}, nil
}
-type twexec struct {
- payload Payload
- err error
-}
-
-func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- c := make(chan twexec)
- go func() {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("payload can not be empty"),
- }
- return
- }
-
- if tw.w.State().Value() != StateReady {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()),
- }
- return
- }
-
- // set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(rqs)
- if err != nil {
- if _, ok := err.(TaskError); !ok {
- tw.w.State().Set(StateErrored)
- tw.w.State().RegisterExec()
- }
- c <- twexec{
- payload: EmptyPayload,
- err: err,
- }
- return
- }
-
- tw.w.State().Set(StateReady)
- tw.w.State().RegisterExec()
- c <- twexec{
- payload: rsp,
- err: nil,
- }
- return
- }()
-
- for {
- select {
- case <-ctx.Done():
- return EmptyPayload, ctx.Err()
- case res := <-c:
- if res.err != nil {
- return EmptyPayload, res.err
- }
-
- return res.payload, nil
- }
- }
-}
-
-//
-func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+// Exec payload without TTL timeout.
+func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
return EmptyPayload, fmt.Errorf("payload can not be empty")
}
@@ -109,9 +45,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.w.State().Set(StateWorking)
- rsp, err := tw.execPayload(rqs)
+ rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(TaskError); !ok {
+ if _, ok := err.(JobError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -124,7 +60,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
return EmptyPayload, errors.Wrap(err, "header error")
@@ -147,7 +83,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, TaskError(rsp.Context)
+ return EmptyPayload, JobError(rsp.Context)
}
// add streaming support :)
@@ -158,46 +94,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) String() string {
+func (tw *syncWorker) String() string {
return tw.w.String()
}
-func (tw *taskWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *syncWorker) Pid() int64 {
+ return tw.w.Pid()
}
-func (tw *taskWorker) Events() <-chan WorkerEvent {
- return tw.w.Events()
+func (tw *syncWorker) Created() time.Time {
+ return tw.w.Created()
}
-func (tw *taskWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *syncWorker) AddListener(listener util.EventListener) {
+ tw.w.AddListener(listener)
}
-func (tw *taskWorker) State() State {
+func (tw *syncWorker) State() State {
return tw.w.State()
}
-func (tw *taskWorker) Start() error {
+func (tw *syncWorker) Start() error {
return tw.w.Start()
}
-func (tw *taskWorker) Wait(ctx context.Context) error {
+func (tw *syncWorker) Wait(ctx context.Context) error {
return tw.w.Wait(ctx)
}
-func (tw *taskWorker) Stop(ctx context.Context) error {
+func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *taskWorker) Kill(ctx context.Context) error {
+func (tw *syncWorker) Kill(ctx context.Context) error {
return tw.w.Kill(ctx)
}
-func (tw *taskWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() goridge.Relay {
return tw.w.Relay()
}
-func (tw *taskWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
tw.w.AttachRelay(rl)
}