diff options
Diffstat (limited to 'worker')
-rw-r--r-- | worker/interface.go | 74 | ||||
-rwxr-xr-x | worker/state.go | 111 | ||||
-rwxr-xr-x | worker/state_test.go | 27 | ||||
-rwxr-xr-x | worker/sync_worker.go | 297 | ||||
-rwxr-xr-x | worker/sync_worker_test.go | 33 | ||||
-rwxr-xr-x | worker/worker.go | 235 | ||||
-rwxr-xr-x | worker/worker_test.go | 19 |
7 files changed, 0 insertions, 796 deletions
diff --git a/worker/interface.go b/worker/interface.go deleted file mode 100644 index 25e98f0a..00000000 --- a/worker/interface.go +++ /dev/null @@ -1,74 +0,0 @@ -package worker - -import ( - "context" - "fmt" - "time" - - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/payload" -) - -// State represents WorkerProcess status and updated time. -type State interface { - fmt.Stringer - // Value returns StateImpl value - Value() int64 - // Set sets the StateImpl - Set(value int64) - // NumExecs shows how many times WorkerProcess was invoked - NumExecs() uint64 - // IsActive returns true if WorkerProcess not Inactive or Stopped - IsActive() bool - // RegisterExec using to registering php executions - RegisterExec() - // SetLastUsed sets worker last used time - SetLastUsed(lu uint64) - // LastUsed return worker last used time - LastUsed() uint64 -} - -type BaseProcess interface { - fmt.Stringer - - // Pid returns worker pid. - Pid() int64 - - // Created returns time worker was created at. - Created() time.Time - - // State return receive-only WorkerProcess state object, state can be used to safely access - // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() State - - // Start used to run Cmd and immediately return - Start() error - - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is - // complete and will return process error (if any), if stderr is presented it's value - // will be wrapped as WorkerError. Method will return error code if php process fails - // to find or Start the script. - Wait() error - - // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop() error - - // Kill kills underlying process, make sure to call Wait() func to gather - // error log from the stderr. Does not waits for process completion! - Kill() error - - // Relay returns attached to worker goridge relay - Relay() relay.Relay - - // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl relay.Relay) -} - -type SyncWorker interface { - // BaseProcess provides basic functionality for the SyncWorker - BaseProcess - // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs *payload.Payload) (*payload.Payload, error) - // ExecWithTTL used to handle Exec with TTL - ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) -} diff --git a/worker/state.go b/worker/state.go deleted file mode 100755 index bf152e8b..00000000 --- a/worker/state.go +++ /dev/null @@ -1,111 +0,0 @@ -package worker - -import ( - "sync/atomic" -) - -// SYNC WITH worker_watcher.GET -const ( - // StateInactive - no associated process - StateInactive int64 = iota - - // StateReady - ready for job. - StateReady - - // StateWorking - working on given payload. - StateWorking - - // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. - StateInvalid - - // StateStopping - process is being softly stopped. - StateStopping - - // StateKilling - process is being forcibly stopped - StateKilling - - // StateDestroyed State of worker, when no need to allocate new one - StateDestroyed - - // StateMaxJobsReached State of worker, when it reached executions limit - StateMaxJobsReached - - // StateStopped - process has been terminated. - StateStopped - - // StateErrored - error StateImpl (can't be used). - StateErrored -) - -type StateImpl struct { - value int64 - numExecs uint64 - // to be lightweight, use UnixNano - lastUsed uint64 -} - -// NewWorkerState initializes a state for the sync.Worker -func NewWorkerState(value int64) *StateImpl { - return &StateImpl{value: value} -} - -// String returns current StateImpl as string. -func (s *StateImpl) String() string { - switch s.Value() { - case StateInactive: - return "inactive" - case StateReady: - return "ready" - case StateWorking: - return "working" - case StateInvalid: - return "invalid" - case StateStopping: - return "stopping" - case StateStopped: - return "stopped" - case StateKilling: - return "killing" - case StateErrored: - return "errored" - case StateDestroyed: - return "destroyed" - } - - return "undefined" -} - -// NumExecs returns number of registered WorkerProcess execs. -func (s *StateImpl) NumExecs() uint64 { - return atomic.LoadUint64(&s.numExecs) -} - -// Value StateImpl returns StateImpl value -func (s *StateImpl) Value() int64 { - return atomic.LoadInt64(&s.value) -} - -// IsActive returns true if WorkerProcess not Inactive or Stopped -func (s *StateImpl) IsActive() bool { - val := s.Value() - return val == StateWorking || val == StateReady -} - -// Set change StateImpl value (status) -func (s *StateImpl) Set(value int64) { - atomic.StoreInt64(&s.value, value) -} - -// RegisterExec register new execution atomically -func (s *StateImpl) RegisterExec() { - atomic.AddUint64(&s.numExecs, 1) -} - -// SetLastUsed Update last used time -func (s *StateImpl) SetLastUsed(lu uint64) { - atomic.StoreUint64(&s.lastUsed, lu) -} - -func (s *StateImpl) LastUsed() uint64 { - return atomic.LoadUint64(&s.lastUsed) -} diff --git a/worker/state_test.go b/worker/state_test.go deleted file mode 100755 index c67182d6..00000000 --- a/worker/state_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package worker - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_NewState(t *testing.T) { - st := NewWorkerState(StateErrored) - - assert.Equal(t, "errored", st.String()) - - assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) - assert.Equal(t, "ready", NewWorkerState(StateReady).String()) - assert.Equal(t, "working", NewWorkerState(StateWorking).String()) - assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) - assert.Equal(t, "undefined", NewWorkerState(1000).String()) -} - -func Test_IsActive(t *testing.T) { - assert.False(t, NewWorkerState(StateInactive).IsActive()) - assert.True(t, NewWorkerState(StateReady).IsActive()) - assert.True(t, NewWorkerState(StateWorking).IsActive()) - assert.False(t, NewWorkerState(StateStopped).IsActive()) - assert.False(t, NewWorkerState(StateErrored).IsActive()) -} diff --git a/worker/sync_worker.go b/worker/sync_worker.go deleted file mode 100755 index 6ece0ad7..00000000 --- a/worker/sync_worker.go +++ /dev/null @@ -1,297 +0,0 @@ -package worker - -import ( - "bytes" - "context" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/payload" - "go.uber.org/multierr" -) - -// Allocator is responsible for worker allocation in the pool -type Allocator func() (SyncWorker, error) - -type SyncWorkerImpl struct { - process *Process - fPool sync.Pool - bPool sync.Pool - chPool sync.Pool -} - -// From creates SyncWorker from BaseProcess -func From(process *Process) *SyncWorkerImpl { - return &SyncWorkerImpl{ - process: process, - fPool: sync.Pool{New: func() interface{} { - return frame.NewFrame() - }}, - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - - chPool: sync.Pool{New: func() interface{} { - return make(chan wexec, 1) - }}, - } -} - -// Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec") - - if len(p.Body) == 0 && len(p.Context) == 0 { - return nil, errors.E(op, errors.Str("payload can not be empty")) - } - - if tw.process.State().Value() != StateReady { - return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) - } - - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) - - rsp, err := tw.execPayload(p) - if err != nil { - // just to be more verbose - if !errors.Is(errors.SoftJob, err) { - tw.process.State().Set(StateErrored) - tw.process.State().RegisterExec() - } - return nil, errors.E(op, err) - } - - // supervisor may set state of the worker during the work - // in this case we should not re-write the worker state - if tw.process.State().Value() != StateWorking { - tw.process.State().RegisterExec() - return rsp, nil - } - - tw.process.State().Set(StateReady) - tw.process.State().RegisterExec() - - return rsp, nil -} - -type wexec struct { - payload *payload.Payload - err error -} - -// ExecWithTTL executes payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec_worker_with_timeout") - - if len(p.Body) == 0 && len(p.Context) == 0 { - return nil, errors.E(op, errors.Str("payload can not be empty")) - } - - c := tw.getCh() - defer tw.putCh(c) - - // worker was killed before it started to work (supervisor) - if tw.process.State().Value() != StateReady { - return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) - } - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) - - go func() { - rsp, err := tw.execPayload(p) - if err != nil { - // just to be more verbose - if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple - tw.process.State().Set(StateErrored) - tw.process.State().RegisterExec() - } - c <- wexec{ - err: errors.E(op, err), - } - return - } - - if tw.process.State().Value() != StateWorking { - tw.process.State().RegisterExec() - c <- wexec{ - payload: rsp, - err: nil, - } - return - } - - tw.process.State().Set(StateReady) - tw.process.State().RegisterExec() - - c <- wexec{ - payload: rsp, - err: nil, - } - }() - - select { - // exec TTL reached - case <-ctx.Done(): - err := multierr.Combine(tw.Kill()) - if err != nil { - // append timeout error - err = multierr.Append(err, errors.E(op, errors.ExecTTL)) - return nil, multierr.Append(err, ctx.Err()) - } - return nil, errors.E(op, errors.ExecTTL, ctx.Err()) - case res := <-c: - if res.err != nil { - return nil, res.err - } - return res.payload, nil - } -} - -func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec_payload") - - // get a frame - fr := tw.getFrame() - defer tw.putFrame(fr) - - // can be 0 here - fr.WriteVersion(fr.Header(), frame.VERSION_1) - fr.WriteFlags(fr.Header(), p.Codec) - - // obtain a buffer - buf := tw.get() - - buf.Write(p.Context) - buf.Write(p.Body) - - // Context offset - fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) - fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) - fr.WritePayload(buf.Bytes()) - - fr.WriteCRC(fr.Header()) - - // return buffer - tw.put(buf) - - err := tw.Relay().Send(fr) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - frameR := tw.getFrame() - defer tw.putFrame(frameR) - - err = tw.process.Relay().Receive(frameR) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - if frameR == nil { - return nil, errors.E(op, errors.Network, errors.Str("nil frame received")) - } - - flags := frameR.ReadFlags() - - if flags&frame.ERROR != byte(0) { - return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) - } - - options := frameR.ReadOptions(frameR.Header()) - if len(options) != 1 { - return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) - } - - pld := &payload.Payload{ - Codec: flags, - Body: make([]byte, len(frameR.Payload()[options[0]:])), - Context: make([]byte, len(frameR.Payload()[:options[0]])), - } - - // by copying we free frame's payload slice - // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) - // https://blog.golang.org/slices-intro#TOC_6. - copy(pld.Body, frameR.Payload()[options[0]:]) - copy(pld.Context, frameR.Payload()[:options[0]]) - - return pld, nil -} - -func (tw *SyncWorkerImpl) String() string { - return tw.process.String() -} - -func (tw *SyncWorkerImpl) Pid() int64 { - return tw.process.Pid() -} - -func (tw *SyncWorkerImpl) Created() time.Time { - return tw.process.Created() -} - -func (tw *SyncWorkerImpl) State() State { - return tw.process.State() -} - -func (tw *SyncWorkerImpl) Start() error { - return tw.process.Start() -} - -func (tw *SyncWorkerImpl) Wait() error { - return tw.process.Wait() -} - -func (tw *SyncWorkerImpl) Stop() error { - return tw.process.Stop() -} - -func (tw *SyncWorkerImpl) Kill() error { - return tw.process.Kill() -} - -func (tw *SyncWorkerImpl) Relay() relay.Relay { - return tw.process.Relay() -} - -func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { - tw.process.AttachRelay(rl) -} - -// Private - -func (tw *SyncWorkerImpl) get() *bytes.Buffer { - return tw.bPool.Get().(*bytes.Buffer) -} - -func (tw *SyncWorkerImpl) put(b *bytes.Buffer) { - b.Reset() - tw.bPool.Put(b) -} - -func (tw *SyncWorkerImpl) getFrame() *frame.Frame { - return tw.fPool.Get().(*frame.Frame) -} - -func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { - f.Reset() - tw.fPool.Put(f) -} - -func (tw *SyncWorkerImpl) getCh() chan wexec { - return tw.chPool.Get().(chan wexec) -} - -func (tw *SyncWorkerImpl) putCh(ch chan wexec) { - // just check if the chan is not empty - select { - case <-ch: - tw.chPool.Put(ch) - default: - tw.chPool.Put(ch) - } -} diff --git a/worker/sync_worker_test.go b/worker/sync_worker_test.go deleted file mode 100755 index 288cbd45..00000000 --- a/worker/sync_worker_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package worker - -import ( - "os/exec" - "testing" - - "github.com/spiral/roadrunner/v2/payload" - "github.com/stretchr/testify/assert" -) - -func Test_NotStarted_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "inactive") - assert.Contains(t, w.String(), "num_execs: 0") -} - -func Test_NotStarted_Exec(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - - sw := From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "Process is not ready (inactive)") -} diff --git a/worker/worker.go b/worker/worker.go deleted file mode 100755 index 52bdbacb..00000000 --- a/worker/worker.go +++ /dev/null @@ -1,235 +0,0 @@ -package worker - -import ( - "fmt" - "os" - "os/exec" - "strconv" - "strings" - "time" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/internal" - "go.uber.org/multierr" - "go.uber.org/zap" -) - -type Options func(p *Process) - -// Process - supervised process with api over goridge.Relay. -type Process struct { - // created indicates at what time Process has been created. - created time.Time - log *zap.Logger - - // state holds information about current Process state, - // number of Process executions, buf status change time. - // publicly this object is receive-only and protected using Mutex - // and atomic counter. - state *StateImpl - - // underlying command with associated process, command must be - // provided to Process from outside in non-started form. CmdSource - // stdErr direction will be handled by Process to aggregate error message. - cmd *exec.Cmd - - // pid of the process, points to pid of underlying process and - // can be nil while process is not started. - pid int - doneCh chan struct{} - - // communication bus with underlying process. - relay relay.Relay -} - -// InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { - if cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") - } - - w := &Process{ - created: time.Now(), - cmd: cmd, - state: NewWorkerState(StateInactive), - doneCh: make(chan struct{}, 1), - } - - // add options - for i := 0; i < len(options); i++ { - options[i](w) - } - - if w.log == nil { - z, err := zap.NewProduction() - if err != nil { - return nil, err - } - - w.log = z - } - - // set self as stderr implementation (Writer interface) - w.cmd.Stderr = w - - return w, nil -} - -func WithLog(z *zap.Logger) Options { - return func(p *Process) { - p.log = z - } -} - -// Pid returns worker pid. -func (w *Process) Pid() int64 { - return int64(w.pid) -} - -// Created returns time worker was created at. -func (w *Process) Created() time.Time { - return w.created -} - -// State return receive-only Process state object, state can be used to safely access -// Process status, time when status changed and number of Process executions. -func (w *Process) State() State { - return w.state -} - -// AttachRelay attaches relay to the worker -func (w *Process) AttachRelay(rl relay.Relay) { - w.relay = rl -} - -// Relay returns relay attached to the worker -func (w *Process) Relay() relay.Relay { - return w.relay -} - -// String returns Process description. fmt.Stringer interface -func (w *Process) String() string { - st := w.state.String() - // we can safely compare pid to 0 - if w.pid != 0 { - st = st + ", pid:" + strconv.Itoa(w.pid) - } - - return fmt.Sprintf( - "(`%s` [%s], num_execs: %v)", - strings.Join(w.cmd.Args, " "), - st, - w.state.NumExecs(), - ) -} - -func (w *Process) Start() error { - err := w.cmd.Start() - if err != nil { - return err - } - w.pid = w.cmd.Process.Pid - return nil -} - -// Wait must be called once for each Process, call will be released once Process is -// complete and will return process error (if any), if stderr is presented it's value -// will be wrapped as WorkerError. Method will return error code if php process fails -// to find or Start the script. -func (w *Process) Wait() error { - const op = errors.Op("process_wait") - var err error - err = w.cmd.Wait() - w.doneCh <- struct{}{} - - // If worker was destroyed, just exit - if w.State().Value() == StateDestroyed { - return nil - } - - // If state is different, and err is not nil, append it to the errors - if err != nil { - w.State().Set(StateErrored) - err = multierr.Combine(err, errors.E(op, err)) - } - - // closeRelay - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then process.cmd.Wait return an error - err2 := w.closeRelay() - if err2 != nil { - w.State().Set(StateErrored) - return multierr.Append(err, errors.E(op, err2)) - } - - if w.cmd.ProcessState.Success() { - w.State().Set(StateStopped) - return nil - } - - return err -} - -func (w *Process) closeRelay() error { - if w.relay != nil { - err := w.relay.Close() - if err != nil { - return err - } - } - return nil -} - -// Stop sends soft termination command to the Process and waits for process completion. -func (w *Process) Stop() error { - const op = errors.Op("process_stop") - - select { - // finished - case <-w.doneCh: - return nil - default: - w.state.Set(StateStopping) - err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) - if err != nil { - w.state.Set(StateKilling) - _ = w.cmd.Process.Signal(os.Kill) - - return errors.E(op, errors.Network, err) - } - - <-w.doneCh - w.state.Set(StateStopped) - return nil - } -} - -// Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not wait for process completion! -func (w *Process) Kill() error { - if w.State().Value() == StateDestroyed { - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - - return nil - } - - w.state.Set(StateKilling) - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - w.state.Set(StateStopped) - return nil -} - -// Worker stderr -func (w *Process) Write(p []byte) (n int, err error) { - // unsafe to use utils.AsString - w.log.Info(string(p)) - return len(p), nil -} diff --git a/worker/worker_test.go b/worker/worker_test.go deleted file mode 100755 index 805f66b5..00000000 --- a/worker/worker_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package worker - -import ( - "os/exec" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_OnStarted(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - assert.Nil(t, cmd.Start()) - - w, err := InitBaseWorker(cmd) - assert.Nil(t, w) - assert.NotNil(t, err) - - assert.Equal(t, "can't attach to running process", err.Error()) -} |