diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
commit | f3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch) | |
tree | 32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /worker | |
parent | 5d2cd55ab522d4f1e65a833f91146444465a32ac (diff) |
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rw-r--r-- | worker/interface.go | 74 | ||||
-rwxr-xr-x | worker/state.go | 111 | ||||
-rwxr-xr-x | worker/state_test.go | 27 | ||||
-rwxr-xr-x | worker/sync_worker.go | 283 | ||||
-rwxr-xr-x | worker/sync_worker_test.go | 33 | ||||
-rwxr-xr-x | worker/worker.go | 220 | ||||
-rwxr-xr-x | worker/worker_test.go | 19 |
7 files changed, 767 insertions, 0 deletions
diff --git a/worker/interface.go b/worker/interface.go new file mode 100644 index 00000000..25e98f0a --- /dev/null +++ b/worker/interface.go @@ -0,0 +1,74 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/roadrunner/v2/payload" +) + +// State represents WorkerProcess status and updated time. +type State interface { + fmt.Stringer + // Value returns StateImpl value + Value() int64 + // Set sets the StateImpl + Set(value int64) + // NumExecs shows how many times WorkerProcess was invoked + NumExecs() uint64 + // IsActive returns true if WorkerProcess not Inactive or Stopped + IsActive() bool + // RegisterExec using to registering php executions + RegisterExec() + // SetLastUsed sets worker last used time + SetLastUsed(lu uint64) + // LastUsed return worker last used time + LastUsed() uint64 +} + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop() error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() relay.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl relay.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs *payload.Payload) (*payload.Payload, error) + // ExecWithTTL used to handle Exec with TTL + ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) +} diff --git a/worker/state.go b/worker/state.go new file mode 100755 index 00000000..bf152e8b --- /dev/null +++ b/worker/state.go @@ -0,0 +1,111 @@ +package worker + +import ( + "sync/atomic" +) + +// SYNC WITH worker_watcher.GET +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + // StateKilling - process is being forcibly stopped + StateKilling + + // StateDestroyed State of worker, when no need to allocate new one + StateDestroyed + + // StateMaxJobsReached State of worker, when it reached executions limit + StateMaxJobsReached + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error StateImpl (can't be used). + StateErrored +) + +type StateImpl struct { + value int64 + numExecs uint64 + // to be lightweight, use UnixNano + lastUsed uint64 +} + +// NewWorkerState initializes a state for the sync.Worker +func NewWorkerState(value int64) *StateImpl { + return &StateImpl{value: value} +} + +// String returns current StateImpl as string. +func (s *StateImpl) String() string { + switch s.Value() { + case StateInactive: + return "inactive" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateInvalid: + return "invalid" + case StateStopping: + return "stopping" + case StateStopped: + return "stopped" + case StateKilling: + return "killing" + case StateErrored: + return "errored" + case StateDestroyed: + return "destroyed" + } + + return "undefined" +} + +// NumExecs returns number of registered WorkerProcess execs. +func (s *StateImpl) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) +} + +// Value StateImpl returns StateImpl value +func (s *StateImpl) Value() int64 { + return atomic.LoadInt64(&s.value) +} + +// IsActive returns true if WorkerProcess not Inactive or Stopped +func (s *StateImpl) IsActive() bool { + val := s.Value() + return val == StateWorking || val == StateReady +} + +// Set change StateImpl value (status) +func (s *StateImpl) Set(value int64) { + atomic.StoreInt64(&s.value, value) +} + +// RegisterExec register new execution atomically +func (s *StateImpl) RegisterExec() { + atomic.AddUint64(&s.numExecs, 1) +} + +// SetLastUsed Update last used time +func (s *StateImpl) SetLastUsed(lu uint64) { + atomic.StoreUint64(&s.lastUsed, lu) +} + +func (s *StateImpl) LastUsed() uint64 { + return atomic.LoadUint64(&s.lastUsed) +} diff --git a/worker/state_test.go b/worker/state_test.go new file mode 100755 index 00000000..c67182d6 --- /dev/null +++ b/worker/state_test.go @@ -0,0 +1,27 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} diff --git a/worker/sync_worker.go b/worker/sync_worker.go new file mode 100755 index 00000000..deea8cb1 --- /dev/null +++ b/worker/sync_worker.go @@ -0,0 +1,283 @@ +package worker + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/frame" + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/roadrunner/v2/payload" + "go.uber.org/multierr" +) + +// Allocator is responsible for worker allocation in the pool +type Allocator func() (SyncWorker, error) + +type SyncWorkerImpl struct { + process *Process + fPool sync.Pool + bPool sync.Pool +} + +// From creates SyncWorker from BaseProcess +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + fPool: sync.Pool{New: func() interface{} { + return frame.NewFrame() + }}, + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + } +} + +// Exec payload without TTL timeout. +func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("sync_worker_exec") + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + + if tw.process.State().Value() != StateReady { + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if !errors.Is(errors.SoftJob, err) { + tw.process.State().Set(StateErrored) + tw.process.State().RegisterExec() + } + return nil, errors.E(op, err) + } + + // supervisor may set state of the worker during the work + // in this case we should not re-write the worker state + if tw.process.State().Value() != StateWorking { + tw.process.State().RegisterExec() + return rsp, nil + } + + tw.process.State().Set(StateReady) + tw.process.State().RegisterExec() + + return rsp, nil +} + +type wexec struct { + payload *payload.Payload + err error +} + +// ExecWithTTL executes payload without TTL timeout. +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("sync_worker_exec_worker_with_timeout") + c := make(chan wexec, 1) + + go func() { + if len(p.Body) == 0 && len(p.Context) == 0 { + c <- wexec{ + err: errors.E(op, errors.Str("payload can not be empty")), + } + return + } + + if tw.process.State().Value() != StateReady { + c <- wexec{ + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), + } + return + } + + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple + tw.process.State().Set(StateErrored) + tw.process.State().RegisterExec() + } + c <- wexec{ + err: errors.E(op, err), + } + return + } + + if tw.process.State().Value() != StateWorking { + tw.process.State().RegisterExec() + c <- wexec{ + payload: rsp, + err: nil, + } + return + } + + tw.process.State().Set(StateReady) + tw.process.State().RegisterExec() + + c <- wexec{ + payload: rsp, + err: nil, + } + }() + + select { + // exec TTL reached + case <-ctx.Done(): + err := multierr.Combine(tw.Kill()) + if err != nil { + // append timeout error + err = multierr.Append(err, errors.E(op, errors.ExecTTL)) + return nil, multierr.Append(err, ctx.Err()) + } + return nil, errors.E(op, errors.ExecTTL, ctx.Err()) + case res := <-c: + if res.err != nil { + return nil, res.err + } + return res.payload, nil + } +} + +func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("sync_worker_exec_payload") + + // get a frame + fr := tw.getFrame() + defer tw.putFrame(fr) + + // can be 0 here + fr.WriteVersion(fr.Header(), frame.VERSION_1) + + // obtain a buffer + buf := tw.get() + + buf.Write(p.Context) + buf.Write(p.Body) + + // Context offset + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) + + fr.WriteCRC(fr.Header()) + + // return buffer + tw.put(buf) + + err := tw.Relay().Send(fr) + if err != nil { + return nil, errors.E(op, errors.Network, err) + } + + frameR := tw.getFrame() + defer tw.putFrame(frameR) + + err = tw.process.Relay().Receive(frameR) + if err != nil { + return nil, errors.E(op, errors.Network, err) + } + if frameR == nil { + return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) + } + + if !frameR.VerifyCRC(frameR.Header()) { + return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) + } + + flags := frameR.ReadFlags() + + if flags&frame.ERROR != byte(0) { + return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) + } + + options := frameR.ReadOptions(frameR.Header()) + if len(options) != 1 { + return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) + } + + pld := &payload.Payload{ + Body: make([]byte, len(frameR.Payload()[options[0]:])), + Context: make([]byte, len(frameR.Payload()[:options[0]])), + } + + // by copying we free frame's payload slice + // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) + // https://blog.golang.org/slices-intro#TOC_6. + copy(pld.Body, frameR.Payload()[options[0]:]) + copy(pld.Context, frameR.Payload()[:options[0]]) + + return pld, nil +} + +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() +} + +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() +} + +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() +} + +func (tw *SyncWorkerImpl) State() State { + return tw.process.State() +} + +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() +} + +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() +} + +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() +} + +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() +} + +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() +} + +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) +} + +// Private + +func (tw *SyncWorkerImpl) get() *bytes.Buffer { + return tw.bPool.Get().(*bytes.Buffer) +} + +func (tw *SyncWorkerImpl) put(b *bytes.Buffer) { + b.Reset() + tw.bPool.Put(b) +} + +func (tw *SyncWorkerImpl) getFrame() *frame.Frame { + return tw.fPool.Get().(*frame.Frame) +} + +func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { + f.Reset() + tw.fPool.Put(f) +} diff --git a/worker/sync_worker_test.go b/worker/sync_worker_test.go new file mode 100755 index 00000000..41c0c92b --- /dev/null +++ b/worker/sync_worker_test.go @@ -0,0 +1,33 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/spiral/roadrunner/v2/payload" + "github.com/stretchr/testify/assert" +) + +func Test_NotStarted_String(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "inactive") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_NotStarted_Exec(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + + sw := From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "Process is not ready (inactive)") +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100755 index 00000000..38a1e9ac --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,220 @@ +package worker + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/internal" + "go.uber.org/multierr" +) + +type Options func(p *Process) + +// Process - supervised process with api over goridge.Relay. +type Process struct { + // created indicates at what time Process has been created. + created time.Time + + // updates parent supervisor or pool about Process events + events events.Handler + + // state holds information about current Process state, + // number of Process executions, buf status change time. + // publicly this object is receive-only and protected using Mutex + // and atomic counter. + state *StateImpl + + // underlying command with associated process, command must be + // provided to Process from outside in non-started form. CmdSource + // stdErr direction will be handled by Process to aggregate error message. + cmd *exec.Cmd + + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // communication bus with underlying process. + relay relay.Relay +} + +// InitBaseWorker creates new Process over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + w := &Process{ + created: time.Now(), + events: events.NewEventsHandler(), + cmd: cmd, + state: NewWorkerState(StateInactive), + } + + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w + + // add options + for i := 0; i < len(options); i++ { + options[i](w) + } + + return w, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *Process) { + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// Pid returns worker pid. +func (w *Process) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. +func (w *Process) Created() time.Time { + return w.created +} + +// AddListener registers new worker event listener. +func (w *Process) addListener(listener events.Listener) { + w.events.AddListener(listener) +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) State() State { + return w.state +} + +// AttachRelay attaches relay to the worker +func (w *Process) AttachRelay(rl relay.Relay) { + w.relay = rl +} + +// Relay returns relay attached to the worker +func (w *Process) Relay() relay.Relay { + return w.relay +} + +// String returns Process description. fmt.Stringer interface +func (w *Process) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) + } + + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + st, + w.state.NumExecs(), + ) +} + +func (w *Process) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + w.pid = w.cmd.Process.Pid + return nil +} + +// Wait must be called once for each Process, call will be released once Process is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or Start the script. +func (w *Process) Wait() error { + const op = errors.Op("process_wait") + var err error + err = w.cmd.Wait() + + // If worker was destroyed, just exit + if w.State().Value() == StateDestroyed { + return nil + } + + // If state is different, and err is not nil, append it to the errors + if err != nil { + w.State().Set(StateErrored) + err = multierr.Combine(err, errors.E(op, err)) + } + + // closeRelay + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + err2 := w.closeRelay() + if err2 != nil { + w.State().Set(StateErrored) + return multierr.Append(err, errors.E(op, err2)) + } + + if w.cmd.ProcessState.Success() { + w.State().Set(StateStopped) + return nil + } + + return err +} + +func (w *Process) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } + } + return nil +} + +// Stop sends soft termination command to the Process and waits for process completion. +func (w *Process) Stop() error { + const op = errors.Op("process_stop") + w.state.Set(StateStopping) + err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) + if err != nil { + w.state.Set(StateKilling) + _ = w.cmd.Process.Signal(os.Kill) + return errors.E(op, errors.Network, err) + } + w.state.Set(StateStopped) + return nil +} + +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not wait for process completion! +func (w *Process) Kill() error { + if w.State().Value() == StateDestroyed { + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + return nil + } + + w.state.Set(StateKilling) + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + w.state.Set(StateStopped) + return nil +} + +// Worker stderr +func (w *Process) Write(p []byte) (n int, err error) { + w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + return len(p), nil +} diff --git a/worker/worker_test.go b/worker/worker_test.go new file mode 100755 index 00000000..805f66b5 --- /dev/null +++ b/worker/worker_test.go @@ -0,0 +1,19 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := InitBaseWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} |