summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
committerWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
commitba5c562f9038ba434e655fb82c44597fcccaff16 (patch)
treeff112b9dcffda63bc40094a57d0df61622368445 /sync_worker.go
parent3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff)
- massive update in roadrunner 2.0 abstractions
Diffstat (limited to 'sync_worker.go')
-rw-r--r--sync_worker.go110
1 files changed, 25 insertions, 85 deletions
diff --git a/sync_worker.go b/sync_worker.go
index de9491d6..cbc2cc0b 100644
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -3,6 +3,7 @@ package roadrunner
import (
"context"
"fmt"
+ "github.com/spiral/roadrunner/v2/util"
"time"
"github.com/pkg/errors"
@@ -14,19 +15,17 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
+
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
-
- // ExecWithContext allow to set ExecTTL
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
-type taskWorker struct {
+type syncWorker struct {
w WorkerBase
}
func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
- return &taskWorker{
+ return &syncWorker{
w: w,
}, nil
}
@@ -36,68 +35,9 @@ type twexec struct {
err error
}
-func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- c := make(chan twexec)
- go func() {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("payload can not be empty"),
- }
- return
- }
-
- if tw.w.State().Value() != StateReady {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()),
- }
- return
- }
-
- // set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(rqs)
- if err != nil {
- if _, ok := err.(TaskError); !ok {
- tw.w.State().Set(StateErrored)
- tw.w.State().RegisterExec()
- }
- c <- twexec{
- payload: EmptyPayload,
- err: err,
- }
- return
- }
-
- tw.w.State().Set(StateReady)
- tw.w.State().RegisterExec()
- c <- twexec{
- payload: rsp,
- err: nil,
- }
- return
- }()
-
- for {
- select {
- case <-ctx.Done():
- return EmptyPayload, ctx.Err()
- case res := <-c:
- if res.err != nil {
- return EmptyPayload, res.err
- }
-
- return res.payload, nil
- }
- }
-}
-
-//
-func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+// Exec payload without TTL timeout.
+func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
return EmptyPayload, fmt.Errorf("payload can not be empty")
}
@@ -109,9 +49,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.w.State().Set(StateWorking)
- rsp, err := tw.execPayload(rqs)
+ rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(TaskError); !ok {
+ if _, ok := err.(JobError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -124,7 +64,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
return EmptyPayload, errors.Wrap(err, "header error")
@@ -147,7 +87,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, TaskError(rsp.Context)
+ return EmptyPayload, JobError(rsp.Context)
}
// add streaming support :)
@@ -158,46 +98,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) String() string {
+func (tw *syncWorker) String() string {
return tw.w.String()
}
-func (tw *taskWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *syncWorker) Pid() int64 {
+ return tw.w.Pid()
}
-func (tw *taskWorker) Events() <-chan WorkerEvent {
- return tw.w.Events()
+func (tw *syncWorker) Created() time.Time {
+ return tw.w.Created()
}
-func (tw *taskWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *syncWorker) AddListener(listener util.EventListener) {
+ tw.w.AddListener(listener)
}
-func (tw *taskWorker) State() State {
+func (tw *syncWorker) State() State {
return tw.w.State()
}
-func (tw *taskWorker) Start() error {
+func (tw *syncWorker) Start() error {
return tw.w.Start()
}
-func (tw *taskWorker) Wait(ctx context.Context) error {
+func (tw *syncWorker) Wait(ctx context.Context) error {
return tw.w.Wait(ctx)
}
-func (tw *taskWorker) Stop(ctx context.Context) error {
+func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *taskWorker) Kill(ctx context.Context) error {
+func (tw *syncWorker) Kill(ctx context.Context) error {
return tw.w.Kill(ctx)
}
-func (tw *taskWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() goridge.Relay {
return tw.w.Relay()
}
-func (tw *taskWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
tw.w.AttachRelay(rl)
}