diff options
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-x | pkg/worker/sync_worker.go | 39 |
1 files changed, 12 insertions, 27 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 010af076..82a5462a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,41 +6,26 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) // Allocator is responsible for worker allocation in the pool -type Allocator func() (*SyncWorkerImpl, error) +type Allocator func() (SyncWorker, error) type SyncWorkerImpl struct { process *Process } // From creates SyncWorker from BaseProcess -func From(process *Process) *SyncWorkerImpl { +func From(process *Process) SyncWorker { return &SyncWorkerImpl{ process: process, } } -// FromSync creates BaseProcess from SyncWorkerImpl -func FromSync(w *SyncWorkerImpl) BaseProcess { - return &Process{ - created: w.process.created, - events: w.process.events, - state: w.process.state, - cmd: w.process.cmd, - pid: w.process.pid, - endState: w.process.endState, - relay: w.process.relay, - } -} - // Exec payload without TTL timeout. func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") @@ -48,25 +33,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -91,7 +76,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -101,13 +86,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -117,7 +102,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() c <- wexec{ @@ -214,7 +199,7 @@ func (tw *SyncWorkerImpl) Created() time.Time { return tw.process.Created() } -func (tw *SyncWorkerImpl) State() internal.State { +func (tw *SyncWorkerImpl) State() State { return tw.process.State() } |