summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
committerValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
commit77670fb7af0c892c9b3a589fd424534fad288e7a (patch)
tree3adcaa85db664a355abe2b28f1d7e4a3fc45689f /sync_worker.go
parent16fbf3104c3c34bd9355593052b686acd26a8efe (diff)
Update according activity worker
Diffstat (limited to 'sync_worker.go')
-rw-r--r--sync_worker.go39
1 files changed, 36 insertions, 3 deletions
diff --git a/sync_worker.go b/sync_worker.go
index 45629f3e..a6e1ed01 100644
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -14,8 +14,11 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
- // Exec used to execute payload on the SyncWorker
- Exec(ctx context.Context, rqs Payload) (Payload, error)
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs Payload) (Payload, error)
+
+ // ExecWithContext allow to set ExecTTL
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
type taskWorker struct {
@@ -33,7 +36,7 @@ type twexec struct {
err error
}
-func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) {
+func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
c := make(chan twexec)
go func() {
if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
@@ -92,6 +95,36 @@ func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) {
}
}
+//
+func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
+ if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+ return EmptyPayload, fmt.Errorf("payload can not be empty")
+ }
+
+ if tw.w.State().Value() != StateReady {
+ return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(rqs)
+ if err != nil {
+ if _, ok := err.(TaskError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ return EmptyPayload, err
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ return rsp, nil
+
+}
+
func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {