summaryrefslogtreecommitdiff
path: root/pkg/worker/sync_worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker/sync_worker.go
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
Diffstat (limited to 'pkg/worker/sync_worker.go')
-rwxr-xr-xpkg/worker/sync_worker.go111
1 files changed, 64 insertions, 47 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 8314c039..1a0393fb 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -8,50 +8,67 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
)
-type syncWorker struct {
- w worker.BaseProcess
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (*SyncWorkerImpl, error)
+
+type SyncWorkerImpl struct {
+ process *Process
}
// From creates SyncWorker from BaseProcess
-func From(w worker.BaseProcess) (worker.SyncWorker, error) {
- return &syncWorker{
- w: w,
- }, nil
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ }
+}
+
+// FromSync creates BaseProcess from SyncWorkerImpl
+func FromSync(w *SyncWorkerImpl) BaseProcess {
+ return &Process{
+ created: w.process.created,
+ events: w.process.events,
+ state: w.process.state,
+ cmd: w.process.cmd,
+ pid: w.process.pid,
+ stderr: w.process.stderr,
+ endState: w.process.endState,
+ relay: w.process.relay,
+ rd: w.process.rd,
+ }
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.w.State().Value() != internal.StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ if tw.process.State().Value() != internal.StateReady {
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
return rsp, nil
}
@@ -62,7 +79,7 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- if tw.w.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != internal.StateReady {
c <- wexec{
payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
c <- wexec{
payload: payload.Payload{},
@@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
c <- wexec{
payload: rsp,
@@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
}
-func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
@@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
frameR := frame.NewFrame()
- err = tw.w.Relay().Receive(frameR)
+ err = tw.process.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return pl, nil
}
-func (tw *syncWorker) String() string {
- return tw.w.String()
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
}
-func (tw *syncWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
}
-func (tw *syncWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
}
-func (tw *syncWorker) State() internal.State {
- return tw.w.State()
+func (tw *SyncWorkerImpl) State() internal.State {
+ return tw.process.State()
}
-func (tw *syncWorker) Start() error {
- return tw.w.Start()
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
}
-func (tw *syncWorker) Wait() error {
- return tw.w.Wait()
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
}
-func (tw *syncWorker) Stop() error {
- return tw.w.Stop()
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
}
-func (tw *syncWorker) Kill() error {
- return tw.w.Kill()
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
}
-func (tw *syncWorker) Relay() relay.Relay {
- return tw.w.Relay()
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
}
-func (tw *syncWorker) AttachRelay(rl relay.Relay) {
- tw.w.AttachRelay(rl)
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
}