1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
package roadrunner
import (
"bytes"
"encoding/json"
"fmt"
"github.com/spiral/goridge"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
// Worker - supervised process with api over goridge.Relay.
type Worker struct {
// State current worker state.
State State
// Last time worker State has changed
Last time.Time
// NumExecutions how many times worker have been invoked.
NumExecutions uint64
// Pid contains process ID and empty until worker is started.
Pid *int
cmd *exec.Cmd // underlying command process
err *bytes.Buffer // aggregates stderr
rl goridge.Relay // communication bus with underlying process
mu sync.RWMutex // ensures than only one execution can be run at once
}
// NewWorker creates new worker
func NewWorker(cmd *exec.Cmd) (*Worker, error) {
w := &Worker{
cmd: cmd,
err: bytes.NewBuffer(nil),
State: StateInactive,
}
if w.cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
return w, nil
}
// String returns worker description.
func (w *Worker) String() string {
state := w.State.String()
if w.Pid != nil {
state = state + ", pid:" + strconv.Itoa(*w.Pid)
}
return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions)
}
// Start underlying process or return error
func (w *Worker) Start() error {
stderr, err := w.cmd.StderrPipe()
if err != nil {
w.setState(StateError)
return err
}
// copying all process errors into buffer space
go io.Copy(w.err, stderr)
if err := w.cmd.Start(); err != nil {
w.setState(StateError)
return w.mockError(err)
}
w.setState(StateReady)
return nil
}
// Execute command and return result and result context.
func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.State != StateReady {
return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State)
}
w.setState(StateReady)
atomic.AddUint64(&w.NumExecutions, 1)
if ctx != nil {
if data, err := json.Marshal(ctx); err == nil {
w.rl.Send(data, goridge.PayloadControl)
} else {
return nil, nil, fmt.Errorf("invalid context: %s", err)
}
} else {
w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty)
}
w.rl.Send(body, 0)
rCtx, p, err := w.rl.Receive()
if !p.HasFlag(goridge.PayloadControl) {
return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
}
if p.HasFlag(goridge.PayloadError) {
w.setState(StateReady)
return nil, nil, JobError(rCtx)
}
if resp, p, err = w.rl.Receive(); err != nil {
w.setState(StateError)
return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err))
}
w.setState(StateReady)
return resp, rCtx, nil
}
// Stop underlying process or return error.
func (w *Worker) Stop() {
w.mu.Lock()
defer w.mu.Unlock()
w.setState(StateInactive)
go func() {
sendCommand(w.rl, &TerminateCommand{Terminate: true})
}()
w.cmd.Wait()
w.rl.Close()
w.setState(StateStopped)
}
// attach payload/control relay to the worker.
func (w *Worker) attach(rl goridge.Relay) {
w.mu.Lock()
defer w.mu.Unlock()
w.rl = rl
w.setState(StateBooting)
}
// sets worker State and it's context (non blocking!).
func (w *Worker) setState(state State) {
// safer?
w.State = state
w.Last = time.Now()
}
// mockError attaches worker specific error (from stderr) to parent error
func (w *Worker) mockError(err error) WorkerError {
if w.err.Len() != 0 {
return WorkerError(w.err.String())
}
return WorkerError(err.Error())
}
|