summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/interface.go56
-rwxr-xr-xpkg/worker/sync_worker.go111
-rwxr-xr-xpkg/worker/sync_worker_test.go7
-rwxr-xr-xpkg/worker/worker.go10
4 files changed, 126 insertions, 58 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
new file mode 100644
index 00000000..9d74ae10
--- /dev/null
+++ b/pkg/worker/interface.go
@@ -0,0 +1,56 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop() error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() relay.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl relay.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs payload.Payload) (payload.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 8314c039..1a0393fb 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -8,50 +8,67 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
)
-type syncWorker struct {
- w worker.BaseProcess
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (*SyncWorkerImpl, error)
+
+type SyncWorkerImpl struct {
+ process *Process
}
// From creates SyncWorker from BaseProcess
-func From(w worker.BaseProcess) (worker.SyncWorker, error) {
- return &syncWorker{
- w: w,
- }, nil
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ }
+}
+
+// FromSync creates BaseProcess from SyncWorkerImpl
+func FromSync(w *SyncWorkerImpl) BaseProcess {
+ return &Process{
+ created: w.process.created,
+ events: w.process.events,
+ state: w.process.state,
+ cmd: w.process.cmd,
+ pid: w.process.pid,
+ stderr: w.process.stderr,
+ endState: w.process.endState,
+ relay: w.process.relay,
+ rd: w.process.rd,
+ }
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.w.State().Value() != internal.StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ if tw.process.State().Value() != internal.StateReady {
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
return rsp, nil
}
@@ -62,7 +79,7 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- if tw.w.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != internal.StateReady {
c <- wexec{
payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
c <- wexec{
payload: payload.Payload{},
@@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
c <- wexec{
payload: rsp,
@@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
}
-func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
@@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
frameR := frame.NewFrame()
- err = tw.w.Relay().Receive(frameR)
+ err = tw.process.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return pl, nil
}
-func (tw *syncWorker) String() string {
- return tw.w.String()
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
}
-func (tw *syncWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
}
-func (tw *syncWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
}
-func (tw *syncWorker) State() internal.State {
- return tw.w.State()
+func (tw *SyncWorkerImpl) State() internal.State {
+ return tw.process.State()
}
-func (tw *syncWorker) Start() error {
- return tw.w.Start()
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
}
-func (tw *syncWorker) Wait() error {
- return tw.w.Wait()
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
}
-func (tw *syncWorker) Stop() error {
- return tw.w.Stop()
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
}
-func (tw *syncWorker) Kill() error {
- return tw.w.Kill()
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
}
-func (tw *syncWorker) Relay() relay.Relay {
- return tw.w.Relay()
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
}
-func (tw *syncWorker) AttachRelay(rl relay.Relay) {
- tw.w.AttachRelay(rl)
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index 40988b06..df556e93 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) {
w, _ := InitBaseWorker(cmd)
- syncWorker, err := From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index bf70d646..8fd71cca 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,10 +13,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -78,14 +76,14 @@ type Process struct {
}
// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) {
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
const op = errors.Op("init_base_worker")
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
w := &Process{
created: time.Now(),
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
@@ -198,7 +196,7 @@ func (w *Process) Wait() error {
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
- // and then w.cmd.Wait return an error
+ // and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(internal.StateErrored)