diff options
author | Valery Piashchynski <[email protected]> | 2021-02-02 19:30:00 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-02-02 19:30:00 +0300 |
commit | 20a1a5d2eb26090e0eef0e6772330ee2a52526fa (patch) | |
tree | c282e18c20029f60a798576cb4fe47d2762ffba0 | |
parent | 36f01dc035f42115fcfd3b77dc5df3098382cd9f (diff) | |
parent | 2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd (diff) |
Merge pull request #522 from spiral/fix/named_loggerv2.0.0-beta.22
bug(logger): Incorrect parsing of nested log levels
-rwxr-xr-x | .gitignore | 11 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 5 | ||||
-rwxr-xr-x | internal/state.go | 46 | ||||
-rwxr-xr-x | internal/state_test.go | 21 | ||||
-rw-r--r-- | pkg/events/pool_events.go | 5 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 8 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 6 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 6 | ||||
-rw-r--r-- | pkg/states/worker_states.go | 31 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 5 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 10 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 10 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 5 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 17 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 23 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 4 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 | ||||
-rw-r--r-- | plugins/logger/config.go | 12 | ||||
-rw-r--r-- | plugins/logger/plugin.go | 2 | ||||
-rw-r--r-- | plugins/rpc/plugin.go | 31 | ||||
-rw-r--r-- | plugins/server/plugin.go | 2 | ||||
-rw-r--r-- | tests/worker.php | 34 |
23 files changed, 176 insertions, 128 deletions
@@ -9,6 +9,10 @@ # Test binary, built with `go test -c` *.test +unit_tests +unit_tests_copied +dir1 +coverage # Output of the go coverage tool, specifically when used with LiteIDE *.out @@ -18,12 +22,7 @@ .idea composer.lock vendor -vendor_php builds/ tests/vendor/ .rr-sample.yaml -unit_tests -unit_tests_copied -dir1 -coverage -rr
\ No newline at end of file +cmd
\ No newline at end of file @@ -19,7 +19,7 @@ require ( github.com/prometheus/client_golang v1.9.0 github.com/shirou/gopsutil v3.21.1+incompatible github.com/spf13/viper v1.7.1 - github.com/spiral/endure v1.0.0-beta21 + github.com/spiral/endure v1.0.0-beta.22 github.com/spiral/errors v1.0.9 github.com/spiral/goridge/v3 v3.0.0 github.com/stretchr/testify v1.7.0 @@ -406,11 +406,10 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k= -github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw= +github.com/spiral/endure v1.0.0-beta.22 h1:zOhrQ49DeYfr1rOHfUy573pjWpyxkG30vVz4o0ejvaQ= +github.com/spiral/endure v1.0.0-beta.22/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM= github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8= diff --git a/internal/state.go b/internal/state.go index a14a6937..d208aeed 100755 --- a/internal/state.go +++ b/internal/state.go @@ -3,6 +3,8 @@ package internal import ( "fmt" "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/states" ) // State represents WorkerProcess status and updated time. @@ -24,36 +26,6 @@ type State interface { LastUsed() uint64 } -const ( - // StateInactive - no associated process - StateInactive int64 = iota - - // StateReady - ready for job. - StateReady - - // StateWorking - working on given payload. - StateWorking - - // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. - StateInvalid - - // StateStopping - process is being softly stopped. - StateStopping - - StateKilling - - // State of worker, when no need to allocate new one - StateDestroyed - - // StateStopped - process has been terminated. - StateStopped - - // StateErrored - error WorkerState (can't be used). - StateErrored - - StateRemove -) - type WorkerState struct { value int64 numExecs uint64 @@ -69,17 +41,17 @@ func NewWorkerState(value int64) *WorkerState { // String returns current WorkerState as string. func (s *WorkerState) String() string { switch s.Value() { - case StateInactive: + case states.StateInactive: return "inactive" - case StateReady: + case states.StateReady: return "ready" - case StateWorking: + case states.StateWorking: return "working" - case StateInvalid: + case states.StateInvalid: return "invalid" - case StateStopped: + case states.StateStopped: return "stopped" - case StateErrored: + case states.StateErrored: return "errored" } @@ -99,7 +71,7 @@ func (s *WorkerState) Value() int64 { // IsActive returns true if WorkerProcess not Inactive or Stopped func (s *WorkerState) IsActive() bool { val := s.Value() - return val == StateWorking || val == StateReady + return val == states.StateWorking || val == states.StateReady } // change WorkerState value (status) diff --git a/internal/state_test.go b/internal/state_test.go index bdb05825..a0581d57 100755 --- a/internal/state_test.go +++ b/internal/state_test.go @@ -3,25 +3,26 @@ package internal import ( "testing" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/stretchr/testify/assert" ) func Test_NewState(t *testing.T) { - st := NewWorkerState(StateErrored) + st := NewWorkerState(states.StateErrored) assert.Equal(t, "errored", st.String()) - assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) - assert.Equal(t, "ready", NewWorkerState(StateReady).String()) - assert.Equal(t, "working", NewWorkerState(StateWorking).String()) - assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "inactive", NewWorkerState(states.StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(states.StateReady).String()) + assert.Equal(t, "working", NewWorkerState(states.StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(states.StateStopped).String()) assert.Equal(t, "undefined", NewWorkerState(1000).String()) } func Test_IsActive(t *testing.T) { - assert.False(t, NewWorkerState(StateInactive).IsActive()) - assert.True(t, NewWorkerState(StateReady).IsActive()) - assert.True(t, NewWorkerState(StateWorking).IsActive()) - assert.False(t, NewWorkerState(StateStopped).IsActive()) - assert.False(t, NewWorkerState(StateErrored).IsActive()) + assert.False(t, NewWorkerState(states.StateInactive).IsActive()) + assert.True(t, NewWorkerState(states.StateReady).IsActive()) + assert.True(t, NewWorkerState(states.StateWorking).IsActive()) + assert.False(t, NewWorkerState(states.StateStopped).IsActive()) + assert.False(t, NewWorkerState(states.StateErrored).IsActive()) } diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index 3925df56..c569a5a8 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -28,6 +28,9 @@ const ( // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart ) type P int64 @@ -52,6 +55,8 @@ func (ev P) String() string { return "EventIdleTTL" case EventExecTTL: return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" } return "Unknown event type" } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 44adf9c0..01b0574d 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" @@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p func (sp *StaticPool) stopWorker(w worker.SyncWorker) { const op = errors.Op("static_pool_stop_worker") - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) err := w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -251,7 +251,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -263,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return payload.Payload{}, errors.E(op, err) } - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index a32790e0..2d2b2b7d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,9 +12,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/stretchr/testify/assert" ) @@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { list := p.Workers() for _, w := range list { - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) } } @@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) for i := range p.Workers() { - p.Workers()[i].State().Set(internal.StateErrored) + p.Workers()[i].State().Set(states.StateErrored) } _, err = p.Exec(payload.Payload{Body: []byte("hello")}) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 2597b352..3347ecd4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -144,7 +144,7 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == internal.StateInvalid { + if workers[i].State().Value() == states.StateInvalid { continue } @@ -177,7 +177,7 @@ func (sp *supervised) control() { // firs we check maxWorker idle if sp.cfg.IdleTTL != 0 { // then check for the worker state - if workers[i].State().Value() != internal.StateReady { + if workers[i].State().Value() != states.StateReady { continue } diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go new file mode 100644 index 00000000..22fdfe8a --- /dev/null +++ b/pkg/states/worker_states.go @@ -0,0 +1,31 @@ +package states + +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error WorkerState (can't be used). + StateErrored + + StateRemove +) diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index dd7c5841..929043a7 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -92,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } // everything ok, set ready state - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) // return worker c <- SpawnResult{ @@ -152,7 +153,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // everything ok, set ready state - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) return w, nil } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index d4949c82..e247324c 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -8,9 +8,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -21,13 +21,13 @@ func Test_GetState2(t *testing.T) { w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) + assert.Equal(t, states.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) assert.NoError(t, w.Stop()) } @@ -40,13 +40,13 @@ func Test_Kill2(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) + assert.Equal(t, states.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 38166b85..b23af19f 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -9,9 +9,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -23,13 +23,13 @@ func Test_GetState(t *testing.T) { w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) + assert.Equal(t, states.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -46,13 +46,13 @@ func Test_Kill(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) + assert.Equal(t, states.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index ccd2b0bf..98bd2389 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" @@ -124,7 +125,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } w.AttachRelay(rl) - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) c <- socketSpawn{ w: w, @@ -167,7 +168,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) return w, nil } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1a0393fb..696fbdb7 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -10,6 +10,7 @@ import ( "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() c <- wexec{ diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8fd71cca..2f1f399d 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -15,6 +15,7 @@ import ( "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(internal.StateInactive), + state: internal.NewWorkerState(states.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), // sync pool for STDERR @@ -190,7 +191,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") err := multierr.Combine(w.cmd.Wait()) - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { return errors.E(op, err) } @@ -199,7 +200,7 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) @@ -215,12 +216,12 @@ func (w *Process) Wait() error { err = multierr.Append(err, w.closeRelay()) if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) return err } if w.endState.Success() { - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) } w.stderr.Reset() @@ -241,20 +242,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(internal.StateStopping) + w.state.Set(states.StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) return multierr.Append(err, w.cmd.Process.Kill()) } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -262,12 +263,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index d76f4d8f..55034e41 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -129,7 +129,7 @@ func (stack *Stack) Destroy(ctx context.Context) { stack.mutex.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(internal.StateDestroyed) + stack.workers[i].State().Set(states.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 753b61ee..cf2e1eb7 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,8 +5,8 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -49,7 +49,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == internal.StateRemove { + if w != nil && w.State().Value() == states.StateRemove { err := ww.RemoveWorker(w) if err != nil { return nil, err @@ -102,7 +102,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(internal.StateRemove) + wb.State().Set(states.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -142,7 +142,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return diff --git a/plugins/logger/config.go b/plugins/logger/config.go index 8cc88d02..bf7343c7 100644 --- a/plugins/logger/config.go +++ b/plugins/logger/config.go @@ -10,26 +10,26 @@ import ( // ChannelConfig configures loggers per channel. type ChannelConfig struct { // Dedicated channels per logger. By default logger allocated via named logger. - Channels map[string]Config `json:"channels" mapstructure:"channels"` + Channels map[string]Config `mapstructure:"channels"` } type Config struct { // Mode configures logger based on some default template (development, production, off). - Mode string `json:"mode" mapstructure:"mode"` + Mode string `mapstructure:"mode"` // Level is the minimum enabled logging level. Note that this is a dynamic // level, so calling ChannelConfig.Level.SetLevel will atomically change the log // level of all loggers descended from this config. - Level string `json:"level" mapstructure:"level"` + Level string `mapstructure:"level"` // Encoding sets the logger's encoding. Valid values are "json" and // "console", as well as any third-party encodings registered via // RegisterEncoder. - Encoding string `json:"encoding" mapstructure:"encoding"` + Encoding string `mapstructure:"encoding"` // Output is a list of URLs or file paths to write logging output to. // See Open for details. - Output []string `json:"output" mapstructure:"output"` + Output []string `mapstructure:"output"` // ErrorOutput is a list of URLs to write internal logger errors to. // The default is standard error. @@ -37,7 +37,7 @@ type Config struct { // Note that this setting only affects internal errors; for sample code that // sends error-level logs to a different location from info- and debug-level // logs, see the package-level AdvancedConfiguration example. - ErrorOutput []string `json:"errorOutput" mapstructure:"errorOutput"` + ErrorOutput []string `mapstructure:"errorOutput"` } // ZapConfig converts config into Zap configuration. diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go index 141ede64..7fc464b6 100644 --- a/plugins/logger/plugin.go +++ b/plugins/logger/plugin.go @@ -53,7 +53,7 @@ func (z *ZapLogger) NamedLogger(name string) (Logger, error) { if err != nil { return nil, err } - return NewZapAdapter(l), nil + return NewZapAdapter(l.Named(name)), nil } return NewZapAdapter(z.base.Named(name)), nil diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index e13768f0..94fec0b6 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -15,18 +15,13 @@ import ( // PluginName contains default plugin name. const PluginName = "RPC" -type pluggable struct { - service RPCer - name string -} - // Plugin is RPC service. type Plugin struct { cfg Config log logger.Logger rpc *rpc.Server // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC - plugins []pluggable + plugins map[string]RPCer listener net.Listener closed *uint32 } @@ -42,14 +37,23 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, errors.Disabled, err) } + // Init defaults s.cfg.InitDefaults() - + // Init pluggable plugins map + s.plugins = make(map[string]RPCer) + // init logs s.log = log + // set up state state := uint32(0) s.closed = &state atomic.StoreUint32(s.closed, 0) - return s.cfg.Valid() + // validate config + err = s.cfg.Valid() + if err != nil { + return errors.E(op, err) + } + return nil } // Serve serves the service. @@ -62,14 +66,14 @@ func (s *Plugin) Serve() chan error { services := make([]string, 0, len(s.plugins)) // Attach all services - for i := 0; i < len(s.plugins); i++ { - err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC()) + for name := range s.plugins { + err := s.Register(name, s.plugins[name].RPC()) if err != nil { errCh <- errors.E(op, err) return errCh } - services = append(services, s.plugins[i].name) + services = append(services, name) } var err error @@ -128,10 +132,7 @@ func (s *Plugin) Collects() []interface{} { // RegisterPlugin registers RPC service plugin. func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) { - s.plugins = append(s.plugins, pluggable{ - service: p, - name: name.Name(), - }) + s.plugins[name.Name()] = p } // Register publishes in the server the set of methods of the diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 4fad82d1..73ce71f7 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -226,6 +226,8 @@ func (server *Plugin) collectPoolLogs(event interface{}) { server.log.Warn("worker exec timeout reached", "error", we.Payload.(error).Error()) case events.EventIdleTTL: server.log.Warn("worker idle timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid()) + case events.EventPoolRestart: + server.log.Warn("requested pool restart") } } diff --git a/tests/worker.php b/tests/worker.php new file mode 100644 index 00000000..5c9c80e6 --- /dev/null +++ b/tests/worker.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +require __DIR__ . '/vendor/autoload.php'; + +/** + * @param string $dir + * @return array<string> + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { + $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { + $class = 'Temporal\\Tests\\Activity\\' . $name; + $worker->registerActivityImplementations(new $class); +} + +$factory->run(); |