summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go39
1 files changed, 12 insertions, 27 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 010af076..82a5462a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,41 +6,26 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
// Allocator is responsible for worker allocation in the pool
-type Allocator func() (*SyncWorkerImpl, error)
+type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
+func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
}
}
-// FromSync creates BaseProcess from SyncWorkerImpl
-func FromSync(w *SyncWorkerImpl) BaseProcess {
- return &Process{
- created: w.process.created,
- events: w.process.events,
- state: w.process.state,
- cmd: w.process.cmd,
- pid: w.process.pid,
- endState: w.process.endState,
- relay: w.process.relay,
- }
-}
-
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
@@ -48,25 +33,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -91,7 +76,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -101,13 +86,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -117,7 +102,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
c <- wexec{
@@ -214,7 +199,7 @@ func (tw *SyncWorkerImpl) Created() time.Time {
return tw.process.Created()
}
-func (tw *SyncWorkerImpl) State() internal.State {
+func (tw *SyncWorkerImpl) State() State {
return tw.process.State()
}