From 9d5fe4f6a98b30fd73be8259f84fa595ac994a71 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 17 Dec 2020 02:34:44 +0300 Subject: huge refactor --- pkg/worker/events.go | 1 + pkg/worker/sync_worker.go | 226 ++++++++++++++++++++++++++++++ pkg/worker/sync_worker_test.go | 37 +++++ pkg/worker/worker.go | 302 +++++++++++++++++++++++++++++++++++++++++ pkg/worker/worker_test.go | 19 +++ 5 files changed, 585 insertions(+) create mode 100644 pkg/worker/events.go create mode 100755 pkg/worker/sync_worker.go create mode 100755 pkg/worker/sync_worker_test.go create mode 100755 pkg/worker/worker.go create mode 100755 pkg/worker/worker_test.go (limited to 'pkg/worker') diff --git a/pkg/worker/events.go b/pkg/worker/events.go new file mode 100644 index 00000000..4df0094f --- /dev/null +++ b/pkg/worker/events.go @@ -0,0 +1 @@ +package worker diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go new file mode 100755 index 00000000..0fcde2c3 --- /dev/null +++ b/pkg/worker/sync_worker.go @@ -0,0 +1,226 @@ +package worker + +import ( + "bytes" + "context" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + "go.uber.org/multierr" + + "github.com/spiral/goridge/v3" +) + +type syncWorker struct { + w worker.BaseProcess +} + +// From creates SyncWorker from WorkerBasa +func From(w worker.BaseProcess) (worker.SyncWorker, error) { + return &syncWorker{ + w: w, + }, nil +} + +// Exec payload without TTL timeout. +func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { + const op = errors.Op("sync worker Exec") + if len(p.Body) == 0 && len(p.Context) == 0 { + return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + } + + if tw.w.State().Value() != internal.StateReady { + return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(internal.StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if errors.Is(errors.ErrSoftJob, err) == false { + tw.w.State().Set(internal.StateErrored) + tw.w.State().RegisterExec() + } + return internal.Payload{}, err + } + + tw.w.State().Set(internal.StateReady) + tw.w.State().RegisterExec() + + return rsp, nil +} + +type wexec struct { + payload internal.Payload + err error +} + +// Exec payload without TTL timeout. +func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) { + const op = errors.Op("ExecWithContext") + c := make(chan wexec, 1) + go func() { + if len(p.Body) == 0 && len(p.Context) == 0 { + c <- wexec{ + payload: internal.Payload{}, + err: errors.E(op, errors.Str("payload can not be empty")), + } + return + } + + if tw.w.State().Value() != internal.StateReady { + c <- wexec{ + payload: internal.Payload{}, + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + } + return + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(internal.StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if errors.Is(errors.ErrSoftJob, err) == false { + tw.w.State().Set(internal.StateErrored) + tw.w.State().RegisterExec() + } + c <- wexec{ + payload: internal.Payload{}, + err: errors.E(op, err), + } + return + } + + tw.w.State().Set(internal.StateReady) + tw.w.State().RegisterExec() + + c <- wexec{ + payload: rsp, + err: nil, + } + }() + + select { + case <-ctx.Done(): + err := multierr.Combine(tw.Kill()) + if err != nil { + return internal.Payload{}, multierr.Append(err, ctx.Err()) + } + return internal.Payload{}, ctx.Err() + case res := <-c: + if res.err != nil { + return internal.Payload{}, res.err + } + return res.payload, nil + } +} + +func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) { + const op = errors.Op("exec payload") + + frame := goridge.NewFrame() + frame.WriteVersion(goridge.VERSION_1) + // can be 0 here + + buf := new(bytes.Buffer) + buf.Write(p.Context) + buf.Write(p.Body) + + // Context offset + frame.WriteOptions(uint32(len(p.Context))) + frame.WritePayloadLen(uint32(buf.Len())) + frame.WritePayload(buf.Bytes()) + + frame.WriteCRC() + + // empty and free the buffer + buf.Truncate(0) + + err := tw.Relay().Send(frame) + if err != nil { + return internal.Payload{}, err + } + + frameR := goridge.NewFrame() + + err = tw.w.Relay().Receive(frameR) + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + if frameR == nil { + return internal.Payload{}, errors.E(op, errors.Str("nil frame received")) + } + + if !frameR.VerifyCRC() { + return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + } + + flags := frameR.ReadFlags() + + if flags&byte(goridge.ERROR) != byte(0) { + return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + } + + options := frameR.ReadOptions() + if len(options) != 1 { + return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + } + + payload := internal.Payload{} + payload.Context = frameR.Payload()[:options[0]] + payload.Body = frameR.Payload()[options[0]:] + + return payload, nil +} + +func (tw *syncWorker) String() string { + return tw.w.String() +} + +func (tw *syncWorker) Pid() int64 { + return tw.w.Pid() +} + +func (tw *syncWorker) Created() time.Time { + return tw.w.Created() +} + +func (tw *syncWorker) AddListener(listener worker.EventListener) { + tw.w.AddListener(listener) +} + +func (tw *syncWorker) State() internal.State { + return tw.w.State() +} + +func (tw *syncWorker) Start() error { + return tw.w.Start() +} + +func (tw *syncWorker) Wait() error { + return tw.w.Wait() +} + +func (tw *syncWorker) Stop(ctx context.Context) error { + return tw.w.Stop(ctx) +} + +func (tw *syncWorker) Kill() error { + return tw.w.Kill() +} + +func (tw *syncWorker) Relay() goridge.Relay { + return tw.w.Relay() +} + +func (tw *syncWorker) AttachRelay(rl goridge.Relay) { + tw.w.AttachRelay(rl) +} diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go new file mode 100755 index 00000000..e224e105 --- /dev/null +++ b/pkg/worker/sync_worker_test.go @@ -0,0 +1,37 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/spiral/roadrunner/v2/internal" + "github.com/stretchr/testify/assert" +) + +func Test_NotStarted_String(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "inactive") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_NotStarted_Exec(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + + syncWorker, err := From(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "Process is not ready (inactive)") +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100755 index 00000000..d2b4374b --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,302 @@ +package worker + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/util" + + "github.com/spiral/goridge/v3" + "go.uber.org/multierr" +) + +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond + + // ReadBufSize used to make a slice with specified length to read from stderr + ReadBufSize = 10240 // Kb +) + +var syncPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, +} + +// Process - supervised process with api over goridge.Relay. +type Process struct { + // created indicates at what time Process has been created. + created time.Time + + // updates parent supervisor or pool about Process events + events worker.EventsHandler + + // state holds information about current Process state, + // number of Process executions, buf status change time. + // publicly this object is receive-only and protected using Mutex + // and atomic counter. + state *internal.WorkerState + + // underlying command with associated process, command must be + // provided to Process from outside in non-started form. CmdSource + // stdErr direction will be handled by Process to aggregate error message. + cmd *exec.Cmd + + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // stderr aggregates stderr output from underlying process. Value can be + // receive only once command is completed and all pipes are closed. + stderr *bytes.Buffer + + // channel is being closed once command is complete. + // waitDone chan interface{} + + // contains information about resulted process state. + endState *os.ProcessState + + // ensures than only one execution can be run at once. + mu sync.RWMutex + + // communication bus with underlying process. + relay goridge.Relay + // rd in a second part of pipe to read from stderr + rd io.Reader + // stop signal terminates io.Pipe from reading from stderr + stop chan struct{} +} + +// InitBaseWorker creates new Process over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + w := &Process{ + created: time.Now(), + events: util.NewEventsHandler(), + cmd: cmd, + state: internal.NewWorkerState(internal.StateInactive), + stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), + } + + w.rd, w.cmd.Stderr = io.Pipe() + + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(ReadBufSize) + + go func() { + w.watch() + }() + + return w, nil +} + +// Pid returns worker pid. +func (w *Process) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. +func (w *Process) Created() time.Time { + return w.created +} + +// AddListener registers new worker event listener. +func (w *Process) AddListener(listener worker.EventListener) { + w.events.AddListener(listener) +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) State() internal.State { + return w.state +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) AttachRelay(rl goridge.Relay) { + w.relay = rl +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) Relay() goridge.Relay { + return w.relay +} + +// String returns Process description. fmt.Stringer interface +func (w *Process) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) + } + + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + st, + w.state.NumExecs(), + ) +} + +func (w *Process) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + w.pid = w.cmd.Process.Pid + return nil +} + +// Wait must be called once for each Process, call will be released once Process is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or Start the script. +func (w *Process) Wait() error { + const op = errors.Op("worker process wait") + err := multierr.Combine(w.cmd.Wait()) + + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then w.cmd.Wait return an error + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(internal.StateErrored) + + w.mu.RLock() + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} + } + w.mu.RUnlock() + + return multierr.Append(err, w.closeRelay()) + } + + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(internal.StateErrored) + return err + } + + if w.endState.Success() { + w.state.Set(internal.StateStopped) + } + + return nil +} + +func (w *Process) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } + } + return nil +} + +// Stop sends soft termination command to the Process and waits for process completion. +func (w *Process) Stop(ctx context.Context) error { + c := make(chan error) + + go func() { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + c <- multierr.Append(err, w.cmd.Process.Kill()) + } + w.state.Set(internal.StateStopped) + c <- nil + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-c: + if err != nil { + return err + } + return nil + } +} + +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not waits for process completion! +func (w *Process) Kill() error { + w.state.Set(internal.StateKilling) + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + w.state.Set(internal.StateStopped) + return nil +} + +// put the pointer, to not allocate new slice +// but erase it len and then return back +func (w *Process) put(data *[]byte) { + *data = (*data)[:0] + *data = (*data)[:cap(*data)] + + syncPool.Put(data) +} + +// get pointer to the byte slice +func (w *Process) get() *[]byte { + return syncPool.Get().(*[]byte) +} + +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; errBuffer is always nil. +func (w *Process) watch() { + go func() { + for { + select { + case <-w.stop: + buf := w.get() + // read the last data + n, _ := w.rd.Read(*buf) + w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + return + default: + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(*buf) + w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + } + } + }() +} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100755 index 00000000..805f66b5 --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,19 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := InitBaseWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} -- cgit v1.2.3