summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/interface.go74
-rwxr-xr-xpkg/worker/state.go111
-rwxr-xr-xpkg/worker/state_test.go27
-rwxr-xr-xpkg/worker/sync_worker.go283
-rwxr-xr-xpkg/worker/sync_worker_test.go33
-rwxr-xr-xpkg/worker/worker.go220
-rwxr-xr-xpkg/worker/worker_test.go19
7 files changed, 0 insertions, 767 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
deleted file mode 100644
index ed8704bb..00000000
--- a/pkg/worker/interface.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package worker
-
-import (
- "context"
- "fmt"
- "time"
-
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/pkg/payload"
-)
-
-// State represents WorkerProcess status and updated time.
-type State interface {
- fmt.Stringer
- // Value returns StateImpl value
- Value() int64
- // Set sets the StateImpl
- Set(value int64)
- // NumExecs shows how many times WorkerProcess was invoked
- NumExecs() uint64
- // IsActive returns true if WorkerProcess not Inactive or Stopped
- IsActive() bool
- // RegisterExec using to registering php executions
- RegisterExec()
- // SetLastUsed sets worker last used time
- SetLastUsed(lu uint64)
- // LastUsed return worker last used time
- LastUsed() uint64
-}
-
-type BaseProcess interface {
- fmt.Stringer
-
- // Pid returns worker pid.
- Pid() int64
-
- // Created returns time worker was created at.
- Created() time.Time
-
- // State return receive-only WorkerProcess state object, state can be used to safely access
- // WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() State
-
- // Start used to run Cmd and immediately return
- Start() error
-
- // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
- // complete and will return process error (if any), if stderr is presented it's value
- // will be wrapped as WorkerError. Method will return error code if php process fails
- // to find or Start the script.
- Wait() error
-
- // Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop() error
-
- // Kill kills underlying process, make sure to call Wait() func to gather
- // error log from the stderr. Does not waits for process completion!
- Kill() error
-
- // Relay returns attached to worker goridge relay
- Relay() relay.Relay
-
- // AttachRelay used to attach goridge relay to the worker process
- AttachRelay(rl relay.Relay)
-}
-
-type SyncWorker interface {
- // BaseProcess provides basic functionality for the SyncWorker
- BaseProcess
- // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs *payload.Payload) (*payload.Payload, error)
- // ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
-}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
deleted file mode 100755
index bf152e8b..00000000
--- a/pkg/worker/state.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package worker
-
-import (
- "sync/atomic"
-)
-
-// SYNC WITH worker_watcher.GET
-const (
- // StateInactive - no associated process
- StateInactive int64 = iota
-
- // StateReady - ready for job.
- StateReady
-
- // StateWorking - working on given payload.
- StateWorking
-
- // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
- StateInvalid
-
- // StateStopping - process is being softly stopped.
- StateStopping
-
- // StateKilling - process is being forcibly stopped
- StateKilling
-
- // StateDestroyed State of worker, when no need to allocate new one
- StateDestroyed
-
- // StateMaxJobsReached State of worker, when it reached executions limit
- StateMaxJobsReached
-
- // StateStopped - process has been terminated.
- StateStopped
-
- // StateErrored - error StateImpl (can't be used).
- StateErrored
-)
-
-type StateImpl struct {
- value int64
- numExecs uint64
- // to be lightweight, use UnixNano
- lastUsed uint64
-}
-
-// NewWorkerState initializes a state for the sync.Worker
-func NewWorkerState(value int64) *StateImpl {
- return &StateImpl{value: value}
-}
-
-// String returns current StateImpl as string.
-func (s *StateImpl) String() string {
- switch s.Value() {
- case StateInactive:
- return "inactive"
- case StateReady:
- return "ready"
- case StateWorking:
- return "working"
- case StateInvalid:
- return "invalid"
- case StateStopping:
- return "stopping"
- case StateStopped:
- return "stopped"
- case StateKilling:
- return "killing"
- case StateErrored:
- return "errored"
- case StateDestroyed:
- return "destroyed"
- }
-
- return "undefined"
-}
-
-// NumExecs returns number of registered WorkerProcess execs.
-func (s *StateImpl) NumExecs() uint64 {
- return atomic.LoadUint64(&s.numExecs)
-}
-
-// Value StateImpl returns StateImpl value
-func (s *StateImpl) Value() int64 {
- return atomic.LoadInt64(&s.value)
-}
-
-// IsActive returns true if WorkerProcess not Inactive or Stopped
-func (s *StateImpl) IsActive() bool {
- val := s.Value()
- return val == StateWorking || val == StateReady
-}
-
-// Set change StateImpl value (status)
-func (s *StateImpl) Set(value int64) {
- atomic.StoreInt64(&s.value, value)
-}
-
-// RegisterExec register new execution atomically
-func (s *StateImpl) RegisterExec() {
- atomic.AddUint64(&s.numExecs, 1)
-}
-
-// SetLastUsed Update last used time
-func (s *StateImpl) SetLastUsed(lu uint64) {
- atomic.StoreUint64(&s.lastUsed, lu)
-}
-
-func (s *StateImpl) LastUsed() uint64 {
- return atomic.LoadUint64(&s.lastUsed)
-}
diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go
deleted file mode 100755
index c67182d6..00000000
--- a/pkg/worker/state_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package worker
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NewState(t *testing.T) {
- st := NewWorkerState(StateErrored)
-
- assert.Equal(t, "errored", st.String())
-
- assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
- assert.Equal(t, "ready", NewWorkerState(StateReady).String())
- assert.Equal(t, "working", NewWorkerState(StateWorking).String())
- assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
- assert.Equal(t, "undefined", NewWorkerState(1000).String())
-}
-
-func Test_IsActive(t *testing.T) {
- assert.False(t, NewWorkerState(StateInactive).IsActive())
- assert.True(t, NewWorkerState(StateReady).IsActive())
- assert.True(t, NewWorkerState(StateWorking).IsActive())
- assert.False(t, NewWorkerState(StateStopped).IsActive())
- assert.False(t, NewWorkerState(StateErrored).IsActive())
-}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
deleted file mode 100755
index 74e29b71..00000000
--- a/pkg/worker/sync_worker.go
+++ /dev/null
@@ -1,283 +0,0 @@
-package worker
-
-import (
- "bytes"
- "context"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "go.uber.org/multierr"
-)
-
-// Allocator is responsible for worker allocation in the pool
-type Allocator func() (SyncWorker, error)
-
-type SyncWorkerImpl struct {
- process *Process
- fPool sync.Pool
- bPool sync.Pool
-}
-
-// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
- return &SyncWorkerImpl{
- process: process,
- fPool: sync.Pool{New: func() interface{} {
- return frame.NewFrame()
- }},
- bPool: sync.Pool{New: func() interface{} {
- return new(bytes.Buffer)
- }},
- }
-}
-
-// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec")
- if len(p.Body) == 0 && len(p.Context) == 0 {
- return nil, errors.E(op, errors.Str("payload can not be empty"))
- }
-
- if tw.process.State().Value() != StateReady {
- return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if !errors.Is(errors.SoftJob, err) {
- tw.process.State().Set(StateErrored)
- tw.process.State().RegisterExec()
- }
- return nil, errors.E(op, err)
- }
-
- // supervisor may set state of the worker during the work
- // in this case we should not re-write the worker state
- if tw.process.State().Value() != StateWorking {
- tw.process.State().RegisterExec()
- return rsp, nil
- }
-
- tw.process.State().Set(StateReady)
- tw.process.State().RegisterExec()
-
- return rsp, nil
-}
-
-type wexec struct {
- payload *payload.Payload
- err error
-}
-
-// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_worker_with_timeout")
- c := make(chan wexec, 1)
-
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != StateReady {
- c <- wexec{
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
- tw.process.State().Set(StateErrored)
- tw.process.State().RegisterExec()
- }
- c <- wexec{
- err: errors.E(op, err),
- }
- return
- }
-
- if tw.process.State().Value() != StateWorking {
- tw.process.State().RegisterExec()
- c <- wexec{
- payload: rsp,
- err: nil,
- }
- return
- }
-
- tw.process.State().Set(StateReady)
- tw.process.State().RegisterExec()
-
- c <- wexec{
- payload: rsp,
- err: nil,
- }
- }()
-
- select {
- // exec TTL reached
- case <-ctx.Done():
- err := multierr.Combine(tw.Kill())
- if err != nil {
- // append timeout error
- err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return nil, multierr.Append(err, ctx.Err())
- }
- return nil, errors.E(op, errors.ExecTTL, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return nil, res.err
- }
- return res.payload, nil
- }
-}
-
-func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_payload")
-
- // get a frame
- fr := tw.getFrame()
- defer tw.putFrame(fr)
-
- // can be 0 here
- fr.WriteVersion(fr.Header(), frame.VERSION_1)
-
- // obtain a buffer
- buf := tw.get()
-
- buf.Write(p.Context)
- buf.Write(p.Body)
-
- // Context offset
- fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
- fr.WritePayload(buf.Bytes())
-
- fr.WriteCRC(fr.Header())
-
- // return buffer
- tw.put(buf)
-
- err := tw.Relay().Send(fr)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
-
- frameR := tw.getFrame()
- defer tw.putFrame(frameR)
-
- err = tw.process.Relay().Receive(frameR)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
- if frameR == nil {
- return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
- }
-
- if !frameR.VerifyCRC(frameR.Header()) {
- return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
- }
-
- flags := frameR.ReadFlags()
-
- if flags&frame.ERROR != byte(0) {
- return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
- }
-
- options := frameR.ReadOptions(frameR.Header())
- if len(options) != 1 {
- return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
- }
-
- pld := &payload.Payload{
- Body: make([]byte, len(frameR.Payload()[options[0]:])),
- Context: make([]byte, len(frameR.Payload()[:options[0]])),
- }
-
- // by copying we free frame's payload slice
- // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
- // https://blog.golang.org/slices-intro#TOC_6.
- copy(pld.Body, frameR.Payload()[options[0]:])
- copy(pld.Context, frameR.Payload()[:options[0]])
-
- return pld, nil
-}
-
-func (tw *SyncWorkerImpl) String() string {
- return tw.process.String()
-}
-
-func (tw *SyncWorkerImpl) Pid() int64 {
- return tw.process.Pid()
-}
-
-func (tw *SyncWorkerImpl) Created() time.Time {
- return tw.process.Created()
-}
-
-func (tw *SyncWorkerImpl) State() State {
- return tw.process.State()
-}
-
-func (tw *SyncWorkerImpl) Start() error {
- return tw.process.Start()
-}
-
-func (tw *SyncWorkerImpl) Wait() error {
- return tw.process.Wait()
-}
-
-func (tw *SyncWorkerImpl) Stop() error {
- return tw.process.Stop()
-}
-
-func (tw *SyncWorkerImpl) Kill() error {
- return tw.process.Kill()
-}
-
-func (tw *SyncWorkerImpl) Relay() relay.Relay {
- return tw.process.Relay()
-}
-
-func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
- tw.process.AttachRelay(rl)
-}
-
-// Private
-
-func (tw *SyncWorkerImpl) get() *bytes.Buffer {
- return tw.bPool.Get().(*bytes.Buffer)
-}
-
-func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
- b.Reset()
- tw.bPool.Put(b)
-}
-
-func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
- return tw.fPool.Get().(*frame.Frame)
-}
-
-func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
- f.Reset()
- tw.fPool.Put(f)
-}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
deleted file mode 100755
index 64580f9f..00000000
--- a/pkg/worker/sync_worker_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package worker
-
-import (
- "os/exec"
- "testing"
-
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NotStarted_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "inactive")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_NotStarted_Exec(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
-
- sw := From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-
- assert.Contains(t, err.Error(), "Process is not ready (inactive)")
-}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
deleted file mode 100755
index fa74e7b5..00000000
--- a/pkg/worker/worker.go
+++ /dev/null
@@ -1,220 +0,0 @@
-package worker
-
-import (
- "fmt"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "go.uber.org/multierr"
-)
-
-type Options func(p *Process)
-
-// Process - supervised process with api over goridge.Relay.
-type Process struct {
- // created indicates at what time Process has been created.
- created time.Time
-
- // updates parent supervisor or pool about Process events
- events events.Handler
-
- // state holds information about current Process state,
- // number of Process executions, buf status change time.
- // publicly this object is receive-only and protected using Mutex
- // and atomic counter.
- state *StateImpl
-
- // underlying command with associated process, command must be
- // provided to Process from outside in non-started form. CmdSource
- // stdErr direction will be handled by Process to aggregate error message.
- cmd *exec.Cmd
-
- // pid of the process, points to pid of underlying process and
- // can be nil while process is not started.
- pid int
-
- // communication bus with underlying process.
- relay relay.Relay
-}
-
-// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
- if cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
- }
- w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
- }
-
- // set self as stderr implementation (Writer interface)
- w.cmd.Stderr = w
-
- // add options
- for i := 0; i < len(options); i++ {
- options[i](w)
- }
-
- return w, nil
-}
-
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// Pid returns worker pid.
-func (w *Process) Pid() int64 {
- return int64(w.pid)
-}
-
-// Created returns time worker was created at.
-func (w *Process) Created() time.Time {
- return w.created
-}
-
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
-func (w *Process) State() State {
- return w.state
-}
-
-// AttachRelay attaches relay to the worker
-func (w *Process) AttachRelay(rl relay.Relay) {
- w.relay = rl
-}
-
-// Relay returns relay attached to the worker
-func (w *Process) Relay() relay.Relay {
- return w.relay
-}
-
-// String returns Process description. fmt.Stringer interface
-func (w *Process) String() string {
- st := w.state.String()
- // we can safely compare pid to 0
- if w.pid != 0 {
- st = st + ", pid:" + strconv.Itoa(w.pid)
- }
-
- return fmt.Sprintf(
- "(`%s` [%s], numExecs: %v)",
- strings.Join(w.cmd.Args, " "),
- st,
- w.state.NumExecs(),
- )
-}
-
-func (w *Process) Start() error {
- err := w.cmd.Start()
- if err != nil {
- return err
- }
- w.pid = w.cmd.Process.Pid
- return nil
-}
-
-// Wait must be called once for each Process, call will be released once Process is
-// complete and will return process error (if any), if stderr is presented it's value
-// will be wrapped as WorkerError. Method will return error code if php process fails
-// to find or Start the script.
-func (w *Process) Wait() error {
- const op = errors.Op("process_wait")
- var err error
- err = w.cmd.Wait()
-
- // If worker was destroyed, just exit
- if w.State().Value() == StateDestroyed {
- return nil
- }
-
- // If state is different, and err is not nil, append it to the errors
- if err != nil {
- w.State().Set(StateErrored)
- err = multierr.Combine(err, errors.E(op, err))
- }
-
- // closeRelay
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then process.cmd.Wait return an error
- err2 := w.closeRelay()
- if err2 != nil {
- w.State().Set(StateErrored)
- return multierr.Append(err, errors.E(op, err2))
- }
-
- if w.cmd.ProcessState.Success() {
- w.State().Set(StateStopped)
- return nil
- }
-
- return err
-}
-
-func (w *Process) closeRelay() error {
- if w.relay != nil {
- err := w.relay.Close()
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop() error {
- const op = errors.Op("process_stop")
- w.state.Set(StateStopping)
- err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
- if err != nil {
- w.state.Set(StateKilling)
- _ = w.cmd.Process.Signal(os.Kill)
- return errors.E(op, errors.Network, err)
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not wait for process completion!
-func (w *Process) Kill() error {
- if w.State().Value() == StateDestroyed {
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- return nil
- }
-
- w.state.Set(StateKilling)
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// Worker stderr
-func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
- return len(p), nil
-}
diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go
deleted file mode 100755
index 805f66b5..00000000
--- a/pkg/worker/worker_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package worker
-
-import (
- "os/exec"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_OnStarted(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- assert.Nil(t, cmd.Start())
-
- w, err := InitBaseWorker(cmd)
- assert.Nil(t, w)
- assert.NotNil(t, err)
-
- assert.Equal(t, "can't attach to running process", err.Error())
-}