summaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
Diffstat (limited to 'worker')
-rw-r--r--worker/interface.go74
-rwxr-xr-xworker/state.go111
-rwxr-xr-xworker/state_test.go27
-rwxr-xr-xworker/sync_worker.go283
-rwxr-xr-xworker/sync_worker_test.go33
-rwxr-xr-xworker/worker.go220
-rwxr-xr-xworker/worker_test.go19
7 files changed, 767 insertions, 0 deletions
diff --git a/worker/interface.go b/worker/interface.go
new file mode 100644
index 00000000..25e98f0a
--- /dev/null
+++ b/worker/interface.go
@@ -0,0 +1,74 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3/pkg/relay"
+ "github.com/spiral/roadrunner/v2/payload"
+)
+
+// State represents WorkerProcess status and updated time.
+type State interface {
+ fmt.Stringer
+ // Value returns StateImpl value
+ Value() int64
+ // Set sets the StateImpl
+ Set(value int64)
+ // NumExecs shows how many times WorkerProcess was invoked
+ NumExecs() uint64
+ // IsActive returns true if WorkerProcess not Inactive or Stopped
+ IsActive() bool
+ // RegisterExec using to registering php executions
+ RegisterExec()
+ // SetLastUsed sets worker last used time
+ SetLastUsed(lu uint64)
+ // LastUsed return worker last used time
+ LastUsed() uint64
+}
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop() error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() relay.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl relay.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
+ // ExecWithTTL used to handle Exec with TTL
+ ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
+}
diff --git a/worker/state.go b/worker/state.go
new file mode 100755
index 00000000..bf152e8b
--- /dev/null
+++ b/worker/state.go
@@ -0,0 +1,111 @@
+package worker
+
+import (
+ "sync/atomic"
+)
+
+// SYNC WITH worker_watcher.GET
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateKilling - process is being forcibly stopped
+ StateKilling
+
+ // StateDestroyed State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateMaxJobsReached State of worker, when it reached executions limit
+ StateMaxJobsReached
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error StateImpl (can't be used).
+ StateErrored
+)
+
+type StateImpl struct {
+ value int64
+ numExecs uint64
+ // to be lightweight, use UnixNano
+ lastUsed uint64
+}
+
+// NewWorkerState initializes a state for the sync.Worker
+func NewWorkerState(value int64) *StateImpl {
+ return &StateImpl{value: value}
+}
+
+// String returns current StateImpl as string.
+func (s *StateImpl) String() string {
+ switch s.Value() {
+ case StateInactive:
+ return "inactive"
+ case StateReady:
+ return "ready"
+ case StateWorking:
+ return "working"
+ case StateInvalid:
+ return "invalid"
+ case StateStopping:
+ return "stopping"
+ case StateStopped:
+ return "stopped"
+ case StateKilling:
+ return "killing"
+ case StateErrored:
+ return "errored"
+ case StateDestroyed:
+ return "destroyed"
+ }
+
+ return "undefined"
+}
+
+// NumExecs returns number of registered WorkerProcess execs.
+func (s *StateImpl) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
+}
+
+// Value StateImpl returns StateImpl value
+func (s *StateImpl) Value() int64 {
+ return atomic.LoadInt64(&s.value)
+}
+
+// IsActive returns true if WorkerProcess not Inactive or Stopped
+func (s *StateImpl) IsActive() bool {
+ val := s.Value()
+ return val == StateWorking || val == StateReady
+}
+
+// Set change StateImpl value (status)
+func (s *StateImpl) Set(value int64) {
+ atomic.StoreInt64(&s.value, value)
+}
+
+// RegisterExec register new execution atomically
+func (s *StateImpl) RegisterExec() {
+ atomic.AddUint64(&s.numExecs, 1)
+}
+
+// SetLastUsed Update last used time
+func (s *StateImpl) SetLastUsed(lu uint64) {
+ atomic.StoreUint64(&s.lastUsed, lu)
+}
+
+func (s *StateImpl) LastUsed() uint64 {
+ return atomic.LoadUint64(&s.lastUsed)
+}
diff --git a/worker/state_test.go b/worker/state_test.go
new file mode 100755
index 00000000..c67182d6
--- /dev/null
+++ b/worker/state_test.go
@@ -0,0 +1,27 @@
+package worker
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
new file mode 100755
index 00000000..deea8cb1
--- /dev/null
+++ b/worker/sync_worker.go
@@ -0,0 +1,283 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/pkg/frame"
+ "github.com/spiral/goridge/v3/pkg/relay"
+ "github.com/spiral/roadrunner/v2/payload"
+ "go.uber.org/multierr"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (SyncWorker, error)
+
+type SyncWorkerImpl struct {
+ process *Process
+ fPool sync.Pool
+ bPool sync.Pool
+}
+
+// From creates SyncWorker from BaseProcess
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ fPool: sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+ }},
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ }
+}
+
+// Exec payload without TTL timeout.
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec")
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ if tw.process.State().Value() != StateReady {
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if !errors.Is(errors.SoftJob, err) {
+ tw.process.State().Set(StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ return nil, errors.E(op, err)
+ }
+
+ // supervisor may set state of the worker during the work
+ // in this case we should not re-write the worker state
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ return rsp, nil
+ }
+
+ tw.process.State().Set(StateReady)
+ tw.process.State().RegisterExec()
+
+ return rsp, nil
+}
+
+type wexec struct {
+ payload *payload.Payload
+ err error
+}
+
+// ExecWithTTL executes payload without TTL timeout.
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_worker_with_timeout")
+ c := make(chan wexec, 1)
+
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.process.State().Value() != StateReady {
+ c <- wexec{
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
+ tw.process.State().Set(StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ c <- wexec{
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ return
+ }
+
+ tw.process.State().Set(StateReady)
+ tw.process.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ // exec TTL reached
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ // append timeout error
+ err = multierr.Append(err, errors.E(op, errors.ExecTTL))
+ return nil, multierr.Append(err, ctx.Err())
+ }
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return nil, res.err
+ }
+ return res.payload, nil
+ }
+}
+
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_payload")
+
+ // get a frame
+ fr := tw.getFrame()
+ defer tw.putFrame(fr)
+
+ // can be 0 here
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
+
+ // obtain a buffer
+ buf := tw.get()
+
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
+ fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
+
+ fr.WriteCRC(fr.Header())
+
+ // return buffer
+ tw.put(buf)
+
+ err := tw.Relay().Send(fr)
+ if err != nil {
+ return nil, errors.E(op, errors.Network, err)
+ }
+
+ frameR := tw.getFrame()
+ defer tw.putFrame(frameR)
+
+ err = tw.process.Relay().Receive(frameR)
+ if err != nil {
+ return nil, errors.E(op, errors.Network, err)
+ }
+ if frameR == nil {
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ }
+
+ if !frameR.VerifyCRC(frameR.Header()) {
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&frame.ERROR != byte(0) {
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ }
+
+ options := frameR.ReadOptions(frameR.Header())
+ if len(options) != 1 {
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ }
+
+ pld := &payload.Payload{
+ Body: make([]byte, len(frameR.Payload()[options[0]:])),
+ Context: make([]byte, len(frameR.Payload()[:options[0]])),
+ }
+
+ // by copying we free frame's payload slice
+ // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
+ // https://blog.golang.org/slices-intro#TOC_6.
+ copy(pld.Body, frameR.Payload()[options[0]:])
+ copy(pld.Context, frameR.Payload()[:options[0]])
+
+ return pld, nil
+}
+
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
+}
+
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
+}
+
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
+}
+
+func (tw *SyncWorkerImpl) State() State {
+ return tw.process.State()
+}
+
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
+}
+
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
+}
+
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
+}
+
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
+}
+
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
+}
+
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
+}
+
+// Private
+
+func (tw *SyncWorkerImpl) get() *bytes.Buffer {
+ return tw.bPool.Get().(*bytes.Buffer)
+}
+
+func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
+ b.Reset()
+ tw.bPool.Put(b)
+}
+
+func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
+ return tw.fPool.Get().(*frame.Frame)
+}
+
+func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
+ f.Reset()
+ tw.fPool.Put(f)
+}
diff --git a/worker/sync_worker_test.go b/worker/sync_worker_test.go
new file mode 100755
index 00000000..41c0c92b
--- /dev/null
+++ b/worker/sync_worker_test.go
@@ -0,0 +1,33 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NotStarted_String(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "inactive")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_NotStarted_Exec(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+
+ sw := From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ assert.Contains(t, err.Error(), "Process is not ready (inactive)")
+}
diff --git a/worker/worker.go b/worker/worker.go
new file mode 100755
index 00000000..38a1e9ac
--- /dev/null
+++ b/worker/worker.go
@@ -0,0 +1,220 @@
+package worker
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/pkg/relay"
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/internal"
+ "go.uber.org/multierr"
+)
+
+type Options func(p *Process)
+
+// Process - supervised process with api over goridge.Relay.
+type Process struct {
+ // created indicates at what time Process has been created.
+ created time.Time
+
+ // updates parent supervisor or pool about Process events
+ events events.Handler
+
+ // state holds information about current Process state,
+ // number of Process executions, buf status change time.
+ // publicly this object is receive-only and protected using Mutex
+ // and atomic counter.
+ state *StateImpl
+
+ // underlying command with associated process, command must be
+ // provided to Process from outside in non-started form. CmdSource
+ // stdErr direction will be handled by Process to aggregate error message.
+ cmd *exec.Cmd
+
+ // pid of the process, points to pid of underlying process and
+ // can be nil while process is not started.
+ pid int
+
+ // communication bus with underlying process.
+ relay relay.Relay
+}
+
+// InitBaseWorker creates new Process over given exec.cmd.
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
+ }
+ w := &Process{
+ created: time.Now(),
+ events: events.NewEventsHandler(),
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
+ }
+
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
+
+ // add options
+ for i := 0; i < len(options); i++ {
+ options[i](w)
+ }
+
+ return w, nil
+}
+
+func AddListeners(listeners ...events.Listener) Options {
+ return func(p *Process) {
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
+// Pid returns worker pid.
+func (w *Process) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
+func (w *Process) Created() time.Time {
+ return w.created
+}
+
+// AddListener registers new worker event listener.
+func (w *Process) addListener(listener events.Listener) {
+ w.events.AddListener(listener)
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) State() State {
+ return w.state
+}
+
+// AttachRelay attaches relay to the worker
+func (w *Process) AttachRelay(rl relay.Relay) {
+ w.relay = rl
+}
+
+// Relay returns relay attached to the worker
+func (w *Process) Relay() relay.Relay {
+ return w.relay
+}
+
+// String returns Process description. fmt.Stringer interface
+func (w *Process) String() string {
+ st := w.state.String()
+ // we can safely compare pid to 0
+ if w.pid != 0 {
+ st = st + ", pid:" + strconv.Itoa(w.pid)
+ }
+
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ st,
+ w.state.NumExecs(),
+ )
+}
+
+func (w *Process) Start() error {
+ err := w.cmd.Start()
+ if err != nil {
+ return err
+ }
+ w.pid = w.cmd.Process.Pid
+ return nil
+}
+
+// Wait must be called once for each Process, call will be released once Process is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or Start the script.
+func (w *Process) Wait() error {
+ const op = errors.Op("process_wait")
+ var err error
+ err = w.cmd.Wait()
+
+ // If worker was destroyed, just exit
+ if w.State().Value() == StateDestroyed {
+ return nil
+ }
+
+ // If state is different, and err is not nil, append it to the errors
+ if err != nil {
+ w.State().Set(StateErrored)
+ err = multierr.Combine(err, errors.E(op, err))
+ }
+
+ // closeRelay
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then process.cmd.Wait return an error
+ err2 := w.closeRelay()
+ if err2 != nil {
+ w.State().Set(StateErrored)
+ return multierr.Append(err, errors.E(op, err2))
+ }
+
+ if w.cmd.ProcessState.Success() {
+ w.State().Set(StateStopped)
+ return nil
+ }
+
+ return err
+}
+
+func (w *Process) closeRelay() error {
+ if w.relay != nil {
+ err := w.relay.Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Stop sends soft termination command to the Process and waits for process completion.
+func (w *Process) Stop() error {
+ const op = errors.Op("process_stop")
+ w.state.Set(StateStopping)
+ err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
+ if err != nil {
+ w.state.Set(StateKilling)
+ _ = w.cmd.Process.Signal(os.Kill)
+ return errors.E(op, errors.Network, err)
+ }
+ w.state.Set(StateStopped)
+ return nil
+}
+
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr. Does not wait for process completion!
+func (w *Process) Kill() error {
+ if w.State().Value() == StateDestroyed {
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
+ w.state.Set(StateKilling)
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ w.state.Set(StateStopped)
+ return nil
+}
+
+// Worker stderr
+func (w *Process) Write(p []byte) (n int, err error) {
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ return len(p), nil
+}
diff --git a/worker/worker_test.go b/worker/worker_test.go
new file mode 100755
index 00000000..805f66b5
--- /dev/null
+++ b/worker/worker_test.go
@@ -0,0 +1,19 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := InitBaseWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}