summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go370
1 files changed, 0 insertions, 370 deletions
diff --git a/worker.go b/worker.go
deleted file mode 100755
index 07c1e5c8..00000000
--- a/worker.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package roadrunner
-
-import (
- "bytes"
- "context"
- "fmt"
- "io"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
-
- "github.com/spiral/goridge/v3"
- "go.uber.org/multierr"
-)
-
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 25 * time.Millisecond
-
- // ReadBufSize used to make a slice with specified length to read from stderr
- ReadBufSize = 10240 // Kb
-)
-
-// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError Event = iota + 200
-
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
-)
-
-type Event int64
-
-func (ev Event) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- }
- return "Unknown event type"
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event Event
-
- // Worker triggered the event.
- Worker WorkerBase
-
- // Event specific payload.
- Payload interface{}
-}
-
-var pool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
-type WorkerBase interface {
- fmt.Stringer
-
- // Pid returns worker pid.
- Pid() int64
-
- // Created returns time worker was created at.
- Created() time.Time
-
- // AddListener attaches listener to consume worker events.
- AddListener(listener util.EventListener)
-
- // State return receive-only WorkerProcess state object, state can be used to safely access
- // WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() State
-
- // Start used to run Cmd and immediately return
- Start() error
-
- // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
- // complete and will return process error (if any), if stderr is presented it's value
- // will be wrapped as WorkerError. Method will return error code if php process fails
- // to find or Start the script.
- Wait() error
-
- // Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop(ctx context.Context) error
-
- // Kill kills underlying process, make sure to call Wait() func to gather
- // error log from the stderr. Does not waits for process completion!
- Kill() error
-
- // Relay returns attached to worker goridge relay
- Relay() goridge.Relay
-
- // AttachRelay used to attach goridge relay to the worker process
- AttachRelay(rl goridge.Relay)
-}
-
-// WorkerProcess - supervised process with api over goridge.Relay.
-type WorkerProcess struct {
- // created indicates at what time WorkerProcess has been created.
- created time.Time
-
- // updates parent supervisor or pool about WorkerProcess events
- events util.EventsHandler
-
- // state holds information about current WorkerProcess state,
- // number of WorkerProcess executions, buf status change time.
- // publicly this object is receive-only and protected using Mutex
- // and atomic counter.
- state *state
-
- // underlying command with associated process, command must be
- // provided to WorkerProcess from outside in non-started form. CmdSource
- // stdErr direction will be handled by WorkerProcess to aggregate error message.
- cmd *exec.Cmd
-
- // pid of the process, points to pid of underlying process and
- // can be nil while process is not started.
- pid int
-
- // stderr aggregates stderr output from underlying process. Value can be
- // receive only once command is completed and all pipes are closed.
- stderr *bytes.Buffer
-
- // channel is being closed once command is complete.
- // waitDone chan interface{}
-
- // contains information about resulted process state.
- endState *os.ProcessState
-
- // ensures than only one execution can be run at once.
- mu sync.RWMutex
-
- // communication bus with underlying process.
- relay goridge.Relay
- rd io.Reader
- stop chan struct{}
-}
-
-// InitBaseWorker creates new WorkerProcess over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
- if cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
- }
- w := &WorkerProcess{
- created: time.Now(),
- events: &util.EventHandler{},
- cmd: cmd,
- state: newState(StateInactive),
- stderr: new(bytes.Buffer),
- stop: make(chan struct{}, 1),
- }
-
- w.rd, w.cmd.Stderr = io.Pipe()
-
- // small buffer optimization
- // at this point we know, that stderr will contain huge messages
- w.stderr.Grow(ReadBufSize)
-
- go func() {
- w.watch()
- }()
-
- return w, nil
-}
-
-// Pid returns worker pid.
-func (w *WorkerProcess) Pid() int64 {
- return int64(w.pid)
-}
-
-// Created returns time worker was created at.
-func (w *WorkerProcess) Created() time.Time {
- return w.created
-}
-
-// AddListener registers new worker event listener.
-func (w *WorkerProcess) AddListener(listener util.EventListener) {
- w.events.AddListener(listener)
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) State() State {
- return w.state
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) AttachRelay(rl goridge.Relay) {
- w.relay = rl
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) Relay() goridge.Relay {
- return w.relay
-}
-
-// String returns WorkerProcess description. fmt.Stringer interface
-func (w *WorkerProcess) String() string {
- st := w.state.String()
- // we can safely compare pid to 0
- if w.pid != 0 {
- st = st + ", pid:" + strconv.Itoa(w.pid)
- }
-
- return fmt.Sprintf(
- "(`%s` [%s], numExecs: %v)",
- strings.Join(w.cmd.Args, " "),
- st,
- w.state.NumExecs(),
- )
-}
-
-func (w *WorkerProcess) Start() error {
- err := w.cmd.Start()
- if err != nil {
- return err
- }
- w.pid = w.cmd.Process.Pid
- return nil
-}
-
-// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
-// complete and will return process error (if any), if stderr is presented it's value
-// will be wrapped as WorkerError. Method will return error code if php process fails
-// to find or Start the script.
-func (w *WorkerProcess) Wait() error {
- const op = errors.Op("worker process wait")
- err := multierr.Combine(w.cmd.Wait())
-
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then w.cmd.Wait return an error
- w.endState = w.cmd.ProcessState
- if err != nil {
- w.state.Set(StateErrored)
-
- w.mu.RLock()
- // if process return code > 0, here will be an error from stderr (if presents)
- if w.stderr.Len() > 0 {
- err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
- // stop the stderr buffer
- w.stop <- struct{}{}
- }
- w.mu.RUnlock()
-
- return multierr.Append(err, w.closeRelay())
- }
-
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(StateErrored)
- return err
- }
-
- if w.endState.Success() {
- w.state.Set(StateStopped)
- }
-
- return nil
-}
-
-func (w *WorkerProcess) closeRelay() error {
- if w.relay != nil {
- err := w.relay.Close()
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// Stop sends soft termination command to the WorkerProcess and waits for process completion.
-func (w *WorkerProcess) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(StateStopping)
- err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
- if err != nil {
- w.state.Set(StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
- }
-}
-
-// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not waits for process completion!
-func (w *WorkerProcess) Kill() error {
- w.state.Set(StateKilling)
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// put the pointer, to not allocate new slice
-// but erase it len and then return back
-func (w *WorkerProcess) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- pool.Put(data)
-}
-
-// get pointer to the byte slice
-func (w *WorkerProcess) get() *[]byte {
- return pool.Get().(*[]byte)
-}
-
-// Write appends the contents of pool to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *WorkerProcess) watch() {
- go func() {
- for {
- select {
- case <-w.stop:
- buf := w.get()
- // read the last data
- n, _ := w.rd.Read(*buf)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- return
- default:
- // read the max 10kb of stderr per one read
- buf := w.get()
- n, _ := w.rd.Read(*buf)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- }
- }
- }()
-}