summaryrefslogtreecommitdiff
path: root/worker/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker/sync_worker.go')
-rwxr-xr-xworker/sync_worker.go283
1 files changed, 283 insertions, 0 deletions
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
new file mode 100755
index 00000000..deea8cb1
--- /dev/null
+++ b/worker/sync_worker.go
@@ -0,0 +1,283 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/pkg/frame"
+ "github.com/spiral/goridge/v3/pkg/relay"
+ "github.com/spiral/roadrunner/v2/payload"
+ "go.uber.org/multierr"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (SyncWorker, error)
+
+type SyncWorkerImpl struct {
+ process *Process
+ fPool sync.Pool
+ bPool sync.Pool
+}
+
+// From creates SyncWorker from BaseProcess
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ fPool: sync.Pool{New: func() interface{} {
+ return frame.NewFrame()
+ }},
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ }
+}
+
+// Exec payload without TTL timeout.
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec")
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ if tw.process.State().Value() != StateReady {
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if !errors.Is(errors.SoftJob, err) {
+ tw.process.State().Set(StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ return nil, errors.E(op, err)
+ }
+
+ // supervisor may set state of the worker during the work
+ // in this case we should not re-write the worker state
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ return rsp, nil
+ }
+
+ tw.process.State().Set(StateReady)
+ tw.process.State().RegisterExec()
+
+ return rsp, nil
+}
+
+type wexec struct {
+ payload *payload.Payload
+ err error
+}
+
+// ExecWithTTL executes payload without TTL timeout.
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_worker_with_timeout")
+ c := make(chan wexec, 1)
+
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.process.State().Value() != StateReady {
+ c <- wexec{
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
+ tw.process.State().Set(StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ c <- wexec{
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ return
+ }
+
+ tw.process.State().Set(StateReady)
+ tw.process.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ // exec TTL reached
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ // append timeout error
+ err = multierr.Append(err, errors.E(op, errors.ExecTTL))
+ return nil, multierr.Append(err, ctx.Err())
+ }
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return nil, res.err
+ }
+ return res.payload, nil
+ }
+}
+
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_payload")
+
+ // get a frame
+ fr := tw.getFrame()
+ defer tw.putFrame(fr)
+
+ // can be 0 here
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
+
+ // obtain a buffer
+ buf := tw.get()
+
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
+ fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
+
+ fr.WriteCRC(fr.Header())
+
+ // return buffer
+ tw.put(buf)
+
+ err := tw.Relay().Send(fr)
+ if err != nil {
+ return nil, errors.E(op, errors.Network, err)
+ }
+
+ frameR := tw.getFrame()
+ defer tw.putFrame(frameR)
+
+ err = tw.process.Relay().Receive(frameR)
+ if err != nil {
+ return nil, errors.E(op, errors.Network, err)
+ }
+ if frameR == nil {
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ }
+
+ if !frameR.VerifyCRC(frameR.Header()) {
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&frame.ERROR != byte(0) {
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ }
+
+ options := frameR.ReadOptions(frameR.Header())
+ if len(options) != 1 {
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ }
+
+ pld := &payload.Payload{
+ Body: make([]byte, len(frameR.Payload()[options[0]:])),
+ Context: make([]byte, len(frameR.Payload()[:options[0]])),
+ }
+
+ // by copying we free frame's payload slice
+ // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
+ // https://blog.golang.org/slices-intro#TOC_6.
+ copy(pld.Body, frameR.Payload()[options[0]:])
+ copy(pld.Context, frameR.Payload()[:options[0]])
+
+ return pld, nil
+}
+
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
+}
+
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
+}
+
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
+}
+
+func (tw *SyncWorkerImpl) State() State {
+ return tw.process.State()
+}
+
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
+}
+
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
+}
+
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
+}
+
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
+}
+
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
+}
+
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
+}
+
+// Private
+
+func (tw *SyncWorkerImpl) get() *bytes.Buffer {
+ return tw.bPool.Get().(*bytes.Buffer)
+}
+
+func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
+ b.Reset()
+ tw.bPool.Put(b)
+}
+
+func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
+ return tw.fPool.Get().(*frame.Frame)
+}
+
+func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
+ f.Reset()
+ tw.fPool.Put(f)
+}