summaryrefslogtreecommitdiff
path: root/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker/worker.go')
-rwxr-xr-xworker/worker.go51
1 files changed, 26 insertions, 25 deletions
diff --git a/worker/worker.go b/worker/worker.go
index b2689c59..8ca55a3b 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -10,26 +10,18 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
+ "go.uber.org/zap"
)
type Options func(p *Process)
-const (
- workerEventsName string = "worker"
-)
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
-
- // updates parent supervisor or pool about Process events
- events events.EventBus
- eventsID string
+ log *zap.Logger
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -57,27 +49,39 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return nil, fmt.Errorf("can't attach to running process")
}
- eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: eb,
- eventsID: id,
- cmd: cmd,
- state: NewWorkerState(StateInactive),
- doneCh: make(chan struct{}, 1),
+ created: time.Now(),
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
+ doneCh: make(chan struct{}, 1),
}
- // set self as stderr implementation (Writer interface)
- w.cmd.Stderr = w
-
// add options
for i := 0; i < len(options); i++ {
options[i](w)
}
+ if w.log == nil {
+ z, err := zap.NewDevelopment()
+ if err != nil {
+ return nil, err
+ }
+
+ w.log = z
+ }
+
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
+
return w, nil
}
+func WithLog(z *zap.Logger) Options {
+ return func(p *Process) {
+ p.log = z
+ }
+}
+
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -137,7 +141,6 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
var err error
err = w.cmd.Wait()
- defer w.events.Unsubscribe(w.eventsID)
w.doneCh <- struct{}{}
// If worker was destroyed, just exit
@@ -182,7 +185,6 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
const op = errors.Op("process_stop")
- defer w.events.Unsubscribe(w.eventsID)
select {
// finished
@@ -213,7 +215,6 @@ func (w *Process) Kill() error {
return err
}
- w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -223,12 +224,12 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
- w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p)))
+ // unsafe to use utils.AsString
+ w.log.Info(string(p))
return len(p), nil
}