summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go244
1 files changed, 0 insertions, 244 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
deleted file mode 100755
index 1a0393fb..00000000
--- a/pkg/worker/sync_worker.go
+++ /dev/null
@@ -1,244 +0,0 @@
-package worker
-
-import (
- "bytes"
- "context"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "go.uber.org/multierr"
-)
-
-// Allocator is responsible for worker allocation in the pool
-type Allocator func() (*SyncWorkerImpl, error)
-
-type SyncWorkerImpl struct {
- process *Process
-}
-
-// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
- return &SyncWorkerImpl{
- process: process,
- }
-}
-
-// FromSync creates BaseProcess from SyncWorkerImpl
-func FromSync(w *SyncWorkerImpl) BaseProcess {
- return &Process{
- created: w.process.created,
- events: w.process.events,
- state: w.process.state,
- cmd: w.process.cmd,
- pid: w.process.pid,
- stderr: w.process.stderr,
- endState: w.process.endState,
- relay: w.process.relay,
- rd: w.process.rd,
- }
-}
-
-// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("sync_worker_exec")
- if len(p.Body) == 0 && len(p.Context) == 0 {
- return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
- }
-
- if tw.process.State().Value() != internal.StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
- tw.process.State().RegisterExec()
- }
- return payload.Payload{}, err
- }
-
- tw.process.State().Set(internal.StateReady)
- tw.process.State().RegisterExec()
-
- return rsp, nil
-}
-
-type wexec struct {
- payload payload.Payload
- err error
-}
-
-// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_worker_with_timeout")
- c := make(chan wexec, 1)
-
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != internal.StateReady {
- c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
- tw.process.State().RegisterExec()
- }
- c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, err),
- }
- return
- }
-
- tw.process.State().Set(internal.StateReady)
- tw.process.State().RegisterExec()
-
- c <- wexec{
- payload: rsp,
- err: nil,
- }
- }()
-
- select {
- // exec TTL reached
- case <-ctx.Done():
- err := multierr.Combine(tw.Kill())
- if err != nil {
- // append timeout error
- err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return payload.Payload{}, multierr.Append(err, ctx.Err())
- }
- return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
- return res.payload, nil
- }
-}
-
-func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_payload")
-
- fr := frame.NewFrame()
- fr.WriteVersion(frame.VERSION_1)
- // can be 0 here
-
- buf := new(bytes.Buffer)
- buf.Write(p.Context)
- buf.Write(p.Body)
-
- // Context offset
- fr.WriteOptions(uint32(len(p.Context)))
- fr.WritePayloadLen(uint32(buf.Len()))
- fr.WritePayload(buf.Bytes())
-
- fr.WriteCRC()
-
- // empty and free the buffer
- buf.Truncate(0)
-
- err := tw.Relay().Send(fr)
- if err != nil {
- return payload.Payload{}, err
- }
-
- frameR := frame.NewFrame()
-
- err = tw.process.Relay().Receive(frameR)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
- }
- if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
- }
-
- if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
- }
-
- flags := frameR.ReadFlags()
-
- if flags&byte(frame.ERROR) != byte(0) {
- return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
- }
-
- options := frameR.ReadOptions()
- if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
- }
-
- pl := payload.Payload{}
- pl.Context = frameR.Payload()[:options[0]]
- pl.Body = frameR.Payload()[options[0]:]
-
- return pl, nil
-}
-
-func (tw *SyncWorkerImpl) String() string {
- return tw.process.String()
-}
-
-func (tw *SyncWorkerImpl) Pid() int64 {
- return tw.process.Pid()
-}
-
-func (tw *SyncWorkerImpl) Created() time.Time {
- return tw.process.Created()
-}
-
-func (tw *SyncWorkerImpl) State() internal.State {
- return tw.process.State()
-}
-
-func (tw *SyncWorkerImpl) Start() error {
- return tw.process.Start()
-}
-
-func (tw *SyncWorkerImpl) Wait() error {
- return tw.process.Wait()
-}
-
-func (tw *SyncWorkerImpl) Stop() error {
- return tw.process.Stop()
-}
-
-func (tw *SyncWorkerImpl) Kill() error {
- return tw.process.Kill()
-}
-
-func (tw *SyncWorkerImpl) Relay() relay.Relay {
- return tw.process.Relay()
-}
-
-func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
- tw.process.AttachRelay(rl)
-}