summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
committerValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
commit9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (patch)
treee49c46b03d8facc73e96f1b6247d83367cc65398 /pkg/worker
parent1033c25b6bfc752d6059e446510f651e22cbf49b (diff)
huge refactor
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/events.go1
-rwxr-xr-xpkg/worker/sync_worker.go226
-rwxr-xr-xpkg/worker/sync_worker_test.go37
-rwxr-xr-xpkg/worker/worker.go302
-rwxr-xr-xpkg/worker/worker_test.go19
5 files changed, 585 insertions, 0 deletions
diff --git a/pkg/worker/events.go b/pkg/worker/events.go
new file mode 100644
index 00000000..4df0094f
--- /dev/null
+++ b/pkg/worker/events.go
@@ -0,0 +1 @@
+package worker
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
new file mode 100755
index 00000000..0fcde2c3
--- /dev/null
+++ b/pkg/worker/sync_worker.go
@@ -0,0 +1,226 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ "go.uber.org/multierr"
+
+ "github.com/spiral/goridge/v3"
+)
+
+type syncWorker struct {
+ w worker.BaseProcess
+}
+
+// From creates SyncWorker from WorkerBasa
+func From(w worker.BaseProcess) (worker.SyncWorker, error) {
+ return &syncWorker{
+ w: w,
+ }, nil
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("sync worker Exec")
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ if tw.w.State().Value() != internal.StateReady {
+ return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(internal.StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ tw.w.State().Set(internal.StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ return internal.Payload{}, err
+ }
+
+ tw.w.State().Set(internal.StateReady)
+ tw.w.State().RegisterExec()
+
+ return rsp, nil
+}
+
+type wexec struct {
+ payload internal.Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("ExecWithContext")
+ c := make(chan wexec, 1)
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: internal.Payload{},
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.w.State().Value() != internal.StateReady {
+ c <- wexec{
+ payload: internal.Payload{},
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(internal.StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ tw.w.State().Set(internal.StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: internal.Payload{},
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.w.State().Set(internal.StateReady)
+ tw.w.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ return internal.Payload{}, multierr.Append(err, ctx.Err())
+ }
+ return internal.Payload{}, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return internal.Payload{}, res.err
+ }
+ return res.payload, nil
+ }
+}
+
+func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("exec payload")
+
+ frame := goridge.NewFrame()
+ frame.WriteVersion(goridge.VERSION_1)
+ // can be 0 here
+
+ buf := new(bytes.Buffer)
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ frame.WriteOptions(uint32(len(p.Context)))
+ frame.WritePayloadLen(uint32(buf.Len()))
+ frame.WritePayload(buf.Bytes())
+
+ frame.WriteCRC()
+
+ // empty and free the buffer
+ buf.Truncate(0)
+
+ err := tw.Relay().Send(frame)
+ if err != nil {
+ return internal.Payload{}, err
+ }
+
+ frameR := goridge.NewFrame()
+
+ err = tw.w.Relay().Receive(frameR)
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+ if frameR == nil {
+ return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ }
+
+ if !frameR.VerifyCRC() {
+ return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&byte(goridge.ERROR) != byte(0) {
+ return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ }
+
+ options := frameR.ReadOptions()
+ if len(options) != 1 {
+ return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ }
+
+ payload := internal.Payload{}
+ payload.Context = frameR.Payload()[:options[0]]
+ payload.Body = frameR.Payload()[options[0]:]
+
+ return payload, nil
+}
+
+func (tw *syncWorker) String() string {
+ return tw.w.String()
+}
+
+func (tw *syncWorker) Pid() int64 {
+ return tw.w.Pid()
+}
+
+func (tw *syncWorker) Created() time.Time {
+ return tw.w.Created()
+}
+
+func (tw *syncWorker) AddListener(listener worker.EventListener) {
+ tw.w.AddListener(listener)
+}
+
+func (tw *syncWorker) State() internal.State {
+ return tw.w.State()
+}
+
+func (tw *syncWorker) Start() error {
+ return tw.w.Start()
+}
+
+func (tw *syncWorker) Wait() error {
+ return tw.w.Wait()
+}
+
+func (tw *syncWorker) Stop(ctx context.Context) error {
+ return tw.w.Stop(ctx)
+}
+
+func (tw *syncWorker) Kill() error {
+ return tw.w.Kill()
+}
+
+func (tw *syncWorker) Relay() goridge.Relay {
+ return tw.w.Relay()
+}
+
+func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
+ tw.w.AttachRelay(rl)
+}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
new file mode 100755
index 00000000..e224e105
--- /dev/null
+++ b/pkg/worker/sync_worker_test.go
@@ -0,0 +1,37 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NotStarted_String(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "inactive")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_NotStarted_Exec(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+
+ syncWorker, err := From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "Process is not ready (inactive)")
+}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
new file mode 100755
index 00000000..d2b4374b
--- /dev/null
+++ b/pkg/worker/worker.go
@@ -0,0 +1,302 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/util"
+
+ "github.com/spiral/goridge/v3"
+ "go.uber.org/multierr"
+)
+
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+
+ // ReadBufSize used to make a slice with specified length to read from stderr
+ ReadBufSize = 10240 // Kb
+)
+
+var syncPool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+}
+
+// Process - supervised process with api over goridge.Relay.
+type Process struct {
+ // created indicates at what time Process has been created.
+ created time.Time
+
+ // updates parent supervisor or pool about Process events
+ events worker.EventsHandler
+
+ // state holds information about current Process state,
+ // number of Process executions, buf status change time.
+ // publicly this object is receive-only and protected using Mutex
+ // and atomic counter.
+ state *internal.WorkerState
+
+ // underlying command with associated process, command must be
+ // provided to Process from outside in non-started form. CmdSource
+ // stdErr direction will be handled by Process to aggregate error message.
+ cmd *exec.Cmd
+
+ // pid of the process, points to pid of underlying process and
+ // can be nil while process is not started.
+ pid int
+
+ // stderr aggregates stderr output from underlying process. Value can be
+ // receive only once command is completed and all pipes are closed.
+ stderr *bytes.Buffer
+
+ // channel is being closed once command is complete.
+ // waitDone chan interface{}
+
+ // contains information about resulted process state.
+ endState *os.ProcessState
+
+ // ensures than only one execution can be run at once.
+ mu sync.RWMutex
+
+ // communication bus with underlying process.
+ relay goridge.Relay
+ // rd in a second part of pipe to read from stderr
+ rd io.Reader
+ // stop signal terminates io.Pipe from reading from stderr
+ stop chan struct{}
+}
+
+// InitBaseWorker creates new Process over given exec.cmd.
+func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
+ }
+ w := &Process{
+ created: time.Now(),
+ events: util.NewEventsHandler(),
+ cmd: cmd,
+ state: internal.NewWorkerState(internal.StateInactive),
+ stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
+ }
+
+ w.rd, w.cmd.Stderr = io.Pipe()
+
+ // small buffer optimization
+ // at this point we know, that stderr will contain huge messages
+ w.stderr.Grow(ReadBufSize)
+
+ go func() {
+ w.watch()
+ }()
+
+ return w, nil
+}
+
+// Pid returns worker pid.
+func (w *Process) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
+func (w *Process) Created() time.Time {
+ return w.created
+}
+
+// AddListener registers new worker event listener.
+func (w *Process) AddListener(listener worker.EventListener) {
+ w.events.AddListener(listener)
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) State() internal.State {
+ return w.state
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) AttachRelay(rl goridge.Relay) {
+ w.relay = rl
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) Relay() goridge.Relay {
+ return w.relay
+}
+
+// String returns Process description. fmt.Stringer interface
+func (w *Process) String() string {
+ st := w.state.String()
+ // we can safely compare pid to 0
+ if w.pid != 0 {
+ st = st + ", pid:" + strconv.Itoa(w.pid)
+ }
+
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ st,
+ w.state.NumExecs(),
+ )
+}
+
+func (w *Process) Start() error {
+ err := w.cmd.Start()
+ if err != nil {
+ return err
+ }
+ w.pid = w.cmd.Process.Pid
+ return nil
+}
+
+// Wait must be called once for each Process, call will be released once Process is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or Start the script.
+func (w *Process) Wait() error {
+ const op = errors.Op("worker process wait")
+ err := multierr.Combine(w.cmd.Wait())
+
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then w.cmd.Wait return an error
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+
+ w.mu.RLock()
+ // if process return code > 0, here will be an error from stderr (if presents)
+ if w.stderr.Len() > 0 {
+ err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
+ }
+ w.mu.RUnlock()
+
+ return multierr.Append(err, w.closeRelay())
+ }
+
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+ return err
+ }
+
+ if w.endState.Success() {
+ w.state.Set(internal.StateStopped)
+ }
+
+ return nil
+}
+
+func (w *Process) closeRelay() error {
+ if w.relay != nil {
+ err := w.relay.Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Stop sends soft termination command to the Process and waits for process completion.
+func (w *Process) Stop(ctx context.Context) error {
+ c := make(chan error)
+
+ go func() {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ c <- multierr.Append(err, w.cmd.Process.Kill())
+ }
+ w.state.Set(internal.StateStopped)
+ c <- nil
+ }()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-c:
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+}
+
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr. Does not waits for process completion!
+func (w *Process) Kill() error {
+ w.state.Set(internal.StateKilling)
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ w.state.Set(internal.StateStopped)
+ return nil
+}
+
+// put the pointer, to not allocate new slice
+// but erase it len and then return back
+func (w *Process) put(data *[]byte) {
+ *data = (*data)[:0]
+ *data = (*data)[:cap(*data)]
+
+ syncPool.Put(data)
+}
+
+// get pointer to the byte slice
+func (w *Process) get() *[]byte {
+ return syncPool.Get().(*[]byte)
+}
+
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; errBuffer is always nil.
+func (w *Process) watch() {
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ buf := w.get()
+ // read the last data
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ return
+ default:
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ }
+ }
+ }()
+}
diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go
new file mode 100755
index 00000000..805f66b5
--- /dev/null
+++ b/pkg/worker/worker_test.go
@@ -0,0 +1,19 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := InitBaseWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}