diff options
author | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
commit | 7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch) | |
tree | 3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker | |
parent | ee5d34abde7f3931bf939498eb7a8cb170232f4f (diff) |
interfaces folder deprecated
Diffstat (limited to 'pkg/worker')
-rw-r--r-- | pkg/worker/interface.go | 56 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 111 | ||||
-rwxr-xr-x | pkg/worker/sync_worker_test.go | 7 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 10 |
4 files changed, 126 insertions, 58 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go new file mode 100644 index 00000000..9d74ae10 --- /dev/null +++ b/pkg/worker/interface.go @@ -0,0 +1,56 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop() error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() relay.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl relay.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 8314c039..1a0393fb 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,50 +8,67 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" ) -type syncWorker struct { - w worker.BaseProcess +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process } // From creates SyncWorker from BaseProcess -func From(w worker.BaseProcess) (worker.SyncWorker, error) { - return &syncWorker{ - w: w, - }, nil +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != internal.StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() return rsp, nil } @@ -62,7 +79,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - if tw.w.State().Value() != internal.StateReady { + if tw.process.State().Value() != internal.StateReady { c <- wexec{ payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } c <- wexec{ payload: payload.Payload{}, @@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() c <- wexec{ payload: rsp, @@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } } -func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() @@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { frameR := frame.NewFrame() - err = tw.w.Relay().Receive(frameR) + err = tw.process.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return pl, nil } -func (tw *syncWorker) String() string { - return tw.w.String() +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() } -func (tw *syncWorker) Pid() int64 { - return tw.w.Pid() +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() } -func (tw *syncWorker) Created() time.Time { - return tw.w.Created() +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() } -func (tw *syncWorker) State() internal.State { - return tw.w.State() +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() } -func (tw *syncWorker) Start() error { - return tw.w.Start() +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() } -func (tw *syncWorker) Wait() error { - return tw.w.Wait() +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() } -func (tw *syncWorker) Stop() error { - return tw.w.Stop() +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() } -func (tw *syncWorker) Kill() error { - return tw.w.Kill() +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() } -func (tw *syncWorker) Relay() relay.Relay { - return tw.w.Relay() +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() } -func (tw *syncWorker) AttachRelay(rl relay.Relay) { - tw.w.AttachRelay(rl) +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index 40988b06..df556e93 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) { w, _ := InitBaseWorker(cmd) - syncWorker, err := From(w) - if err != nil { - t.Fatal(err) - } + sw := From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index bf70d646..8fd71cca 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,10 +13,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -78,14 +76,14 @@ type Process struct { } // InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) { +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { const op = errors.Op("init_base_worker") if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } w := &Process{ created: time.Now(), - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), @@ -198,7 +196,7 @@ func (w *Process) Wait() error { // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error + // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(internal.StateErrored) |