diff options
author | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
commit | 7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch) | |
tree | 3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker/sync_worker.go | |
parent | ee5d34abde7f3931bf939498eb7a8cb170232f4f (diff) |
interfaces folder deprecated
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-x | pkg/worker/sync_worker.go | 111 |
1 files changed, 64 insertions, 47 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 8314c039..1a0393fb 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,50 +8,67 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" ) -type syncWorker struct { - w worker.BaseProcess +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process } // From creates SyncWorker from BaseProcess -func From(w worker.BaseProcess) (worker.SyncWorker, error) { - return &syncWorker{ - w: w, - }, nil +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != internal.StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() return rsp, nil } @@ -62,7 +79,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - if tw.w.State().Value() != internal.StateReady { + if tw.process.State().Value() != internal.StateReady { c <- wexec{ payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } c <- wexec{ payload: payload.Payload{}, @@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() c <- wexec{ payload: rsp, @@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } } -func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() @@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { frameR := frame.NewFrame() - err = tw.w.Relay().Receive(frameR) + err = tw.process.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return pl, nil } -func (tw *syncWorker) String() string { - return tw.w.String() +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() } -func (tw *syncWorker) Pid() int64 { - return tw.w.Pid() +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() } -func (tw *syncWorker) Created() time.Time { - return tw.w.Created() +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() } -func (tw *syncWorker) State() internal.State { - return tw.w.State() +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() } -func (tw *syncWorker) Start() error { - return tw.w.Start() +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() } -func (tw *syncWorker) Wait() error { - return tw.w.Wait() +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() } -func (tw *syncWorker) Stop() error { - return tw.w.Stop() +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() } -func (tw *syncWorker) Kill() error { - return tw.w.Kill() +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() } -func (tw *syncWorker) Relay() relay.Relay { - return tw.w.Relay() +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() } -func (tw *syncWorker) AttachRelay(rl relay.Relay) { - tw.w.AttachRelay(rl) +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) } |