summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-02 19:30:00 +0300
committerGitHub <[email protected]>2021-02-02 19:30:00 +0300
commit20a1a5d2eb26090e0eef0e6772330ee2a52526fa (patch)
treec282e18c20029f60a798576cb4fe47d2762ffba0
parent36f01dc035f42115fcfd3b77dc5df3098382cd9f (diff)
parent2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd (diff)
Merge pull request #522 from spiral/fix/named_loggerv2.0.0-beta.22
bug(logger): Incorrect parsing of nested log levels
-rwxr-xr-x.gitignore11
-rw-r--r--go.mod2
-rw-r--r--go.sum5
-rwxr-xr-xinternal/state.go46
-rwxr-xr-xinternal/state_test.go21
-rw-r--r--pkg/events/pool_events.go5
-rwxr-xr-xpkg/pool/static_pool.go8
-rwxr-xr-xpkg/pool/static_pool_test.go6
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rw-r--r--pkg/states/worker_states.go31
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go5
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go10
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go10
-rwxr-xr-xpkg/transport/socket/socket_factory.go5
-rwxr-xr-xpkg/worker/sync_worker.go17
-rwxr-xr-xpkg/worker/worker.go23
-rw-r--r--pkg/worker_watcher/stack.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
-rw-r--r--plugins/logger/config.go12
-rw-r--r--plugins/logger/plugin.go2
-rw-r--r--plugins/rpc/plugin.go31
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--tests/worker.php34
23 files changed, 176 insertions, 128 deletions
diff --git a/.gitignore b/.gitignore
index 431af2e0..2dd4c6ea 100755
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,10 @@
# Test binary, built with `go test -c`
*.test
+unit_tests
+unit_tests_copied
+dir1
+coverage
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
@@ -18,12 +22,7 @@
.idea
composer.lock
vendor
-vendor_php
builds/
tests/vendor/
.rr-sample.yaml
-unit_tests
-unit_tests_copied
-dir1
-coverage
-rr \ No newline at end of file
+cmd \ No newline at end of file
diff --git a/go.mod b/go.mod
index c0e3264b..f8d01f8b 100644
--- a/go.mod
+++ b/go.mod
@@ -19,7 +19,7 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/shirou/gopsutil v3.21.1+incompatible
github.com/spf13/viper v1.7.1
- github.com/spiral/endure v1.0.0-beta21
+ github.com/spiral/endure v1.0.0-beta.22
github.com/spiral/errors v1.0.9
github.com/spiral/goridge/v3 v3.0.0
github.com/stretchr/testify v1.7.0
diff --git a/go.sum b/go.sum
index 96cf4a16..0a591572 100644
--- a/go.sum
+++ b/go.sum
@@ -406,11 +406,10 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
-github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k=
-github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw=
+github.com/spiral/endure v1.0.0-beta.22 h1:zOhrQ49DeYfr1rOHfUy573pjWpyxkG30vVz4o0ejvaQ=
+github.com/spiral/endure v1.0.0-beta.22/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE=
github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM=
github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8=
diff --git a/internal/state.go b/internal/state.go
index a14a6937..d208aeed 100755
--- a/internal/state.go
+++ b/internal/state.go
@@ -3,6 +3,8 @@ package internal
import (
"fmt"
"sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/states"
)
// State represents WorkerProcess status and updated time.
@@ -24,36 +26,6 @@ type State interface {
LastUsed() uint64
}
-const (
- // StateInactive - no associated process
- StateInactive int64 = iota
-
- // StateReady - ready for job.
- StateReady
-
- // StateWorking - working on given payload.
- StateWorking
-
- // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
- StateInvalid
-
- // StateStopping - process is being softly stopped.
- StateStopping
-
- StateKilling
-
- // State of worker, when no need to allocate new one
- StateDestroyed
-
- // StateStopped - process has been terminated.
- StateStopped
-
- // StateErrored - error WorkerState (can't be used).
- StateErrored
-
- StateRemove
-)
-
type WorkerState struct {
value int64
numExecs uint64
@@ -69,17 +41,17 @@ func NewWorkerState(value int64) *WorkerState {
// String returns current WorkerState as string.
func (s *WorkerState) String() string {
switch s.Value() {
- case StateInactive:
+ case states.StateInactive:
return "inactive"
- case StateReady:
+ case states.StateReady:
return "ready"
- case StateWorking:
+ case states.StateWorking:
return "working"
- case StateInvalid:
+ case states.StateInvalid:
return "invalid"
- case StateStopped:
+ case states.StateStopped:
return "stopped"
- case StateErrored:
+ case states.StateErrored:
return "errored"
}
@@ -99,7 +71,7 @@ func (s *WorkerState) Value() int64 {
// IsActive returns true if WorkerProcess not Inactive or Stopped
func (s *WorkerState) IsActive() bool {
val := s.Value()
- return val == StateWorking || val == StateReady
+ return val == states.StateWorking || val == states.StateReady
}
// change WorkerState value (status)
diff --git a/internal/state_test.go b/internal/state_test.go
index bdb05825..a0581d57 100755
--- a/internal/state_test.go
+++ b/internal/state_test.go
@@ -3,25 +3,26 @@ package internal
import (
"testing"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/stretchr/testify/assert"
)
func Test_NewState(t *testing.T) {
- st := NewWorkerState(StateErrored)
+ st := NewWorkerState(states.StateErrored)
assert.Equal(t, "errored", st.String())
- assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
- assert.Equal(t, "ready", NewWorkerState(StateReady).String())
- assert.Equal(t, "working", NewWorkerState(StateWorking).String())
- assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "inactive", NewWorkerState(states.StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(states.StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(states.StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(states.StateStopped).String())
assert.Equal(t, "undefined", NewWorkerState(1000).String())
}
func Test_IsActive(t *testing.T) {
- assert.False(t, NewWorkerState(StateInactive).IsActive())
- assert.True(t, NewWorkerState(StateReady).IsActive())
- assert.True(t, NewWorkerState(StateWorking).IsActive())
- assert.False(t, NewWorkerState(StateStopped).IsActive())
- assert.False(t, NewWorkerState(StateErrored).IsActive())
+ assert.False(t, NewWorkerState(states.StateInactive).IsActive())
+ assert.True(t, NewWorkerState(states.StateReady).IsActive())
+ assert.True(t, NewWorkerState(states.StateWorking).IsActive())
+ assert.False(t, NewWorkerState(states.StateStopped).IsActive())
+ assert.False(t, NewWorkerState(states.StateErrored).IsActive())
}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index 3925df56..c569a5a8 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -28,6 +28,9 @@ const (
// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
)
type P int64
@@ -52,6 +55,8 @@ func (ev P) String() string {
return "EventIdleTTL"
case EventExecTTL:
return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
}
return "Unknown event type"
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 44adf9c0..01b0574d 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -251,7 +251,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -263,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index a32790e0..2d2b2b7d 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,9 +12,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/stretchr/testify/assert"
)
@@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
}
}
@@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for i := range p.Workers() {
- p.Workers()[i].State().Set(internal.StateErrored)
+ p.Workers()[i].State().Set(states.StateErrored)
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 2597b352..3347ecd4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -144,7 +144,7 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == internal.StateInvalid {
+ if workers[i].State().Value() == states.StateInvalid {
continue
}
@@ -177,7 +177,7 @@ func (sp *supervised) control() {
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != internal.StateReady {
+ if workers[i].State().Value() != states.StateReady {
continue
}
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
new file mode 100644
index 00000000..22fdfe8a
--- /dev/null
+++ b/pkg/states/worker_states.go
@@ -0,0 +1,31 @@
+package states
+
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error WorkerState (can't be used).
+ StateErrored
+
+ StateRemove
+)
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index dd7c5841..929043a7 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -92,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
// everything ok, set ready state
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
// return worker
c <- SpawnResult{
@@ -152,7 +153,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// everything ok, set ready state
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
return w, nil
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index d4949c82..e247324c 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -8,9 +8,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -21,13 +21,13 @@ func Test_GetState2(t *testing.T) {
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, internal.StateStopped, w.State().Value())
+ assert.Equal(t, states.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
assert.NoError(t, w.Stop())
}
@@ -40,13 +40,13 @@ func Test_Kill2(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, internal.StateErrored, w.State().Value())
+ assert.Equal(t, states.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 38166b85..b23af19f 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -9,9 +9,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -23,13 +23,13 @@ func Test_GetState(t *testing.T) {
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, internal.StateStopped, w.State().Value())
+ assert.Equal(t, states.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -46,13 +46,13 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, internal.StateErrored, w.State().Value())
+ assert.Equal(t, states.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index ccd2b0bf..98bd2389 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
@@ -124,7 +125,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
w.AttachRelay(rl)
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
c <- socketSpawn{
w: w,
@@ -167,7 +168,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
return w, nil
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1a0393fb..696fbdb7 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
c <- wexec{
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 8fd71cca..2f1f399d 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -15,6 +15,7 @@ import (
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(internal.StateInactive),
+ state: internal.NewWorkerState(states.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
// sync pool for STDERR
@@ -190,7 +191,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
err := multierr.Combine(w.cmd.Wait())
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
return errors.E(op, err)
}
@@ -199,7 +200,7 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
@@ -215,12 +216,12 @@ func (w *Process) Wait() error {
err = multierr.Append(err, w.closeRelay())
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
return err
}
if w.endState.Success() {
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
}
w.stderr.Reset()
@@ -241,20 +242,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(internal.StateStopping)
+ w.state.Set(states.StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
return multierr.Append(err, w.cmd.Process.Kill())
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -262,12 +263,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index d76f4d8f..55034e41 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,7 +5,7 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -129,7 +129,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(internal.StateDestroyed)
+ stack.workers[i].State().Set(states.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 753b61ee..cf2e1eb7 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -5,8 +5,8 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -49,7 +49,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == internal.StateRemove {
+ if w != nil && w.State().Value() == states.StateRemove {
err := ww.RemoveWorker(w)
if err != nil {
return nil, err
@@ -102,7 +102,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(internal.StateRemove)
+ wb.State().Set(states.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -142,7 +142,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
diff --git a/plugins/logger/config.go b/plugins/logger/config.go
index 8cc88d02..bf7343c7 100644
--- a/plugins/logger/config.go
+++ b/plugins/logger/config.go
@@ -10,26 +10,26 @@ import (
// ChannelConfig configures loggers per channel.
type ChannelConfig struct {
// Dedicated channels per logger. By default logger allocated via named logger.
- Channels map[string]Config `json:"channels" mapstructure:"channels"`
+ Channels map[string]Config `mapstructure:"channels"`
}
type Config struct {
// Mode configures logger based on some default template (development, production, off).
- Mode string `json:"mode" mapstructure:"mode"`
+ Mode string `mapstructure:"mode"`
// Level is the minimum enabled logging level. Note that this is a dynamic
// level, so calling ChannelConfig.Level.SetLevel will atomically change the log
// level of all loggers descended from this config.
- Level string `json:"level" mapstructure:"level"`
+ Level string `mapstructure:"level"`
// Encoding sets the logger's encoding. Valid values are "json" and
// "console", as well as any third-party encodings registered via
// RegisterEncoder.
- Encoding string `json:"encoding" mapstructure:"encoding"`
+ Encoding string `mapstructure:"encoding"`
// Output is a list of URLs or file paths to write logging output to.
// See Open for details.
- Output []string `json:"output" mapstructure:"output"`
+ Output []string `mapstructure:"output"`
// ErrorOutput is a list of URLs to write internal logger errors to.
// The default is standard error.
@@ -37,7 +37,7 @@ type Config struct {
// Note that this setting only affects internal errors; for sample code that
// sends error-level logs to a different location from info- and debug-level
// logs, see the package-level AdvancedConfiguration example.
- ErrorOutput []string `json:"errorOutput" mapstructure:"errorOutput"`
+ ErrorOutput []string `mapstructure:"errorOutput"`
}
// ZapConfig converts config into Zap configuration.
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go
index 141ede64..7fc464b6 100644
--- a/plugins/logger/plugin.go
+++ b/plugins/logger/plugin.go
@@ -53,7 +53,7 @@ func (z *ZapLogger) NamedLogger(name string) (Logger, error) {
if err != nil {
return nil, err
}
- return NewZapAdapter(l), nil
+ return NewZapAdapter(l.Named(name)), nil
}
return NewZapAdapter(z.base.Named(name)), nil
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index e13768f0..94fec0b6 100644
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -15,18 +15,13 @@ import (
// PluginName contains default plugin name.
const PluginName = "RPC"
-type pluggable struct {
- service RPCer
- name string
-}
-
// Plugin is RPC service.
type Plugin struct {
cfg Config
log logger.Logger
rpc *rpc.Server
// set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
- plugins []pluggable
+ plugins map[string]RPCer
listener net.Listener
closed *uint32
}
@@ -42,14 +37,23 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, errors.Disabled, err)
}
+ // Init defaults
s.cfg.InitDefaults()
-
+ // Init pluggable plugins map
+ s.plugins = make(map[string]RPCer)
+ // init logs
s.log = log
+ // set up state
state := uint32(0)
s.closed = &state
atomic.StoreUint32(s.closed, 0)
- return s.cfg.Valid()
+ // validate config
+ err = s.cfg.Valid()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
// Serve serves the service.
@@ -62,14 +66,14 @@ func (s *Plugin) Serve() chan error {
services := make([]string, 0, len(s.plugins))
// Attach all services
- for i := 0; i < len(s.plugins); i++ {
- err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC())
+ for name := range s.plugins {
+ err := s.Register(name, s.plugins[name].RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
}
- services = append(services, s.plugins[i].name)
+ services = append(services, name)
}
var err error
@@ -128,10 +132,7 @@ func (s *Plugin) Collects() []interface{} {
// RegisterPlugin registers RPC service plugin.
func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
- s.plugins = append(s.plugins, pluggable{
- service: p,
- name: name.Name(),
- })
+ s.plugins[name.Name()] = p
}
// Register publishes in the server the set of methods of the
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 4fad82d1..73ce71f7 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -226,6 +226,8 @@ func (server *Plugin) collectPoolLogs(event interface{}) {
server.log.Warn("worker exec timeout reached", "error", we.Payload.(error).Error())
case events.EventIdleTTL:
server.log.Warn("worker idle timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid())
+ case events.EventPoolRestart:
+ server.log.Warn("requested pool restart")
}
}
diff --git a/tests/worker.php b/tests/worker.php
new file mode 100644
index 00000000..5c9c80e6
--- /dev/null
+++ b/tests/worker.php
@@ -0,0 +1,34 @@
+<?php
+
+declare(strict_types=1);
+
+require __DIR__ . '/vendor/autoload.php';
+
+/**
+ * @param string $dir
+ * @return array<string>
+ */
+$getClasses = static function (string $dir): iterable {
+ $files = glob($dir . '/*.php');
+
+ foreach ($files as $file) {
+ yield substr(basename($file), 0, -4);
+ }
+};
+
+$factory = \Temporal\WorkerFactory::create();
+
+$worker = $factory->newWorker('default');
+
+// register all workflows
+foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) {
+ $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
+}
+
+// register all activity
+foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
+ $class = 'Temporal\\Tests\\Activity\\' . $name;
+ $worker->registerActivityImplementations(new $class);
+}
+
+$factory->run();