summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/payload/payload.go9
-rw-r--r--pkg/pool/interface.go4
-rwxr-xr-xpkg/pool/static_pool.go91
-rwxr-xr-xpkg/pool/static_pool_test.go44
-rwxr-xr-xpkg/pool/supervisor_pool.go9
-rw-r--r--pkg/states/worker_states.go33
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go5
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go9
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go9
-rwxr-xr-xpkg/transport/socket/socket_factory.go7
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go2
-rw-r--r--pkg/worker/interface.go24
-rwxr-xr-xpkg/worker/state.go108
-rwxr-xr-xpkg/worker/state_test.go27
-rwxr-xr-xpkg/worker/sync_worker.go39
-rwxr-xr-xpkg/worker/worker.go43
-rw-r--r--pkg/worker_watcher/container/interface.go13
-rw-r--r--pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go)79
-rw-r--r--pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go)5
-rw-r--r--pkg/worker_watcher/container/vec.go45
-rw-r--r--pkg/worker_watcher/interface.go27
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go228
22 files changed, 575 insertions, 285 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
index fac36852..bf3972aa 100755
--- a/pkg/payload/payload.go
+++ b/pkg/payload/payload.go
@@ -1,5 +1,7 @@
package payload
+import "unsafe"
+
// Payload carries binary header and body to stack and
// back to the server.
type Payload struct {
@@ -12,5 +14,10 @@ type Payload struct {
// String returns payload body as string
func (p *Payload) String() string {
- return string(p.Body)
+ return toString(p.Body)
+}
+
+// unsafe, but lightning fast []byte to string conversion
+func toString(data []byte) string {
+ return *(*string)(unsafe.Pointer(&data))
}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4f7ae595..bfc56c3f 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -19,10 +19,10 @@ type Pool interface {
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []worker.SyncWorker)
+ Workers() (workers []worker.BaseProcess)
// Remove worker from the pool.
- RemoveWorker(worker worker.SyncWorker) error
+ RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 01b0574d..f1b20bb9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -4,11 +4,11 @@ import (
"context"
"os/exec"
"time"
+ "unsafe"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -46,8 +46,8 @@ type StaticPool struct {
// allocate new worker
allocator worker.Allocator
- // errEncoder is the default Exec error encoder
- errEncoder ErrorEncoder
+ // err_encoder is the default Exec error encoder
+ err_encoder ErrorEncoder //nolint
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -75,21 +75,24 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ // set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ // allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
- // put stack in the pool
- err = p.ww.AddToWatch(workers)
+ // add workers to the watcher
+ err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
}
- p.errEncoder = defaultErrEncoder(p)
+ p.err_encoder = defaultErrEncoder(p)
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
@@ -122,12 +125,13 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
- return sp.ww.WorkersList()
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+ return sp.ww.List()
}
-func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
- return sp.ww.RemoveWorker(wb)
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+ sp.ww.Remove(wb)
+ return nil
}
// Be careful, sync Exec with ExecWithContext
@@ -143,16 +147,15 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.Exec(p)
+ rsp, err := w.(worker.SyncWorker).Exec(p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
- // TODO careful with string(rsp.Context)
- if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ // TODO careful with toString(rsp.Context)
+ if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
sp.stopWorker(w)
-
return sp.Exec(p)
}
@@ -174,13 +177,13 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, p)
+ rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
- if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
sp.stopWorker(w)
return sp.ExecWithContext(ctx, p)
}
@@ -193,9 +196,9 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -203,22 +206,24 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew()
+ w.State().Set(worker.StateDestroyed)
+ sp.ww.Remove(w)
+ err := sp.ww.Allocate()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
- // GetFreeWorker function consumes context with timeout
- w, err := sp.ww.GetFreeWorker(ctxGetFree)
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+ // Get function consumes context with timeout
+ w, err := sp.ww.Get(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -237,33 +242,32 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.SyncWorker) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
- if errors.Is(errors.ExecTTL, err) {
+
+ switch {
+ case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- }
- // soft job errors are allowed
- if errors.Is(errors.SoftJob, err) {
+
+ case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
-
- return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
@@ -276,7 +280,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (*worker.SyncWorkerImpl, error) {
+ return func() (worker.SyncWorker, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -310,9 +314,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.SyncWorker
+ var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
@@ -325,3 +329,8 @@ func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, e
}
return workers, nil
}
+
+// unsafe, but lightning fast []byte to string conversion
+func toString(data []byte) string {
+ return *(*string)(unsafe.Pointer(&data))
+}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 4cfd5ec6..44f5936c 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) {
cfg,
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
res, err := p.Exec(payload.Payload{Body: []byte("hello")})
@@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) {
}
assert.Contains(t, err.Error(), "hello")
+ p.Destroy(ctx)
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
@@ -255,7 +254,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
}
}
@@ -454,7 +453,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
pipe.NewPipeFactory(),
Config{
NumWorkers: 5,
- AllocateTimeout: time.Second,
+ AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
@@ -462,11 +461,11 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for i := range p.Workers() {
- p.Workers()[i].State().Set(states.StateErrored)
+ p.Workers()[i].State().Set(worker.StateErrored)
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
+ assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -645,3 +644,34 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
}
}
}
+
+// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
+// inline BenchmarkToStringUnsafe-12 1000000000 0.295 ns/op 0 B/op 0 allocs/op
+func BenchmarkToStringUnsafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := toString(testPayload)
+ _ = res
+ }
+}
+
+// BenchmarkToStringSafe-12 28584489 39.1 ns/op 112 B/op 1 allocs/op
+// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
+func BenchmarkToStringSafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := toStringNotFun(testPayload)
+ _ = res
+ }
+}
+
+func toStringNotFun(data []byte) string {
+ return string(data)
+}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 3347ecd4..3618786d 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -100,13 +99,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.SyncWorker) {
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
return sp.pool.RemoveWorker(worker)
}
@@ -144,7 +143,7 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == states.StateInvalid {
+ if workers[i].State().Value() == worker.StateInvalid {
continue
}
@@ -177,7 +176,7 @@ func (sp *supervised) control() {
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != states.StateReady {
+ if workers[i].State().Value() != worker.StateReady {
continue
}
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
deleted file mode 100644
index fe653cb4..00000000
--- a/pkg/states/worker_states.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package states
-
-const (
- // StateInactive - no associated process
- StateInactive int64 = iota
-
- // StateReady - ready for job.
- StateReady
-
- // StateWorking - working on given payload.
- StateWorking
-
- // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
- StateInvalid
-
- // StateStopping - process is being softly stopped.
- StateStopping
-
- // StateKilling - process is being forcibly stopped
- StateKilling
-
- // State of worker, when no need to allocate new one
- StateDestroyed
-
- // StateStopped - process has been terminated.
- StateStopped
-
- // StateErrored - error WorkerState (can't be used).
- StateErrored
-
- // StateRemove - worker is killed and removed from the stack
- StateRemove
-)
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index 929043a7..b12ff36a 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -93,7 +92,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
// everything ok, set ready state
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
// return worker
c <- SpawnResult{
@@ -153,7 +152,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// everything ok, set ready state
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
return w, nil
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 73008471..a00b2117 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -21,13 +20,13 @@ func Test_GetState2(t *testing.T) {
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, states.StateStopped, w.State().Value())
+ assert.Equal(t, worker.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
assert.NoError(t, w.Stop())
}
@@ -40,13 +39,13 @@ func Test_Kill2(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, states.StateErrored, w.State().Value())
+ assert.Equal(t, worker.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 3efeb59c..fb40ecb0 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -11,7 +11,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -23,13 +22,13 @@ func Test_GetState(t *testing.T) {
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, states.StateStopped, w.State().Value())
+ assert.Equal(t, worker.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -46,13 +45,13 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, states.StateErrored, w.State().Value())
+ assert.Equal(t, worker.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 98bd2389..f58f9561 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -9,11 +9,10 @@ import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
@@ -125,7 +124,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
w.AttachRelay(rl)
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
c <- socketSpawn{
w: w,
@@ -168,7 +167,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
return w, nil
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index c13a897b..a8dd0fe0 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -401,7 +401,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- block := make(chan struct{})
+ block := make(chan struct{}, 10)
listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
if wev.Event == events.EventWorkerStderr {
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 9d74ae10..96eb25bc 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -5,11 +5,29 @@ import (
"fmt"
"time"
- "github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/pkg/payload"
)
+// State represents WorkerProcess status and updated time.
+type State interface {
+ fmt.Stringer
+ // Value returns StateImpl value
+ Value() int64
+ // Set sets the StateImpl
+ Set(value int64)
+ // NumJobs shows how many times WorkerProcess was invoked
+ NumExecs() uint64
+ // IsActive returns true if WorkerProcess not Inactive or Stopped
+ IsActive() bool
+ // RegisterExec using to registering php executions
+ RegisterExec()
+ // SetLastUsed sets worker last used time
+ SetLastUsed(lu uint64)
+ // LastUsed return worker last used time
+ LastUsed() uint64
+}
+
type BaseProcess interface {
fmt.Stringer
@@ -21,7 +39,7 @@ type BaseProcess interface {
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() internal.State
+ State() State
// Start used to run Cmd and immediately return
Start() error
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
new file mode 100755
index 00000000..176e151b
--- /dev/null
+++ b/pkg/worker/state.go
@@ -0,0 +1,108 @@
+package worker
+
+import (
+ "sync/atomic"
+)
+
+// SYNC WITH worker_watcher.GET
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateKilling - process is being forcibly stopped
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error StateImpl (can't be used).
+ StateErrored
+)
+
+type StateImpl struct {
+ value int64
+ numExecs uint64
+ // to be lightweight, use UnixNano
+ lastUsed uint64
+}
+
+// Thread safe
+func NewWorkerState(value int64) *StateImpl {
+ return &StateImpl{value: value}
+}
+
+// String returns current StateImpl as string.
+func (s *StateImpl) String() string {
+ switch s.Value() {
+ case StateInactive:
+ return "inactive"
+ case StateReady:
+ return "ready"
+ case StateWorking:
+ return "working"
+ case StateInvalid:
+ return "invalid"
+ case StateStopping:
+ return "stopping"
+ case StateStopped:
+ return "stopped"
+ case StateKilling:
+ return "killing"
+ case StateErrored:
+ return "errored"
+ case StateDestroyed:
+ return "destroyed"
+ }
+
+ return "undefined"
+}
+
+// NumExecs returns number of registered WorkerProcess execs.
+func (s *StateImpl) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
+}
+
+// Value StateImpl returns StateImpl value
+func (s *StateImpl) Value() int64 {
+ return atomic.LoadInt64(&s.value)
+}
+
+// IsActive returns true if WorkerProcess not Inactive or Stopped
+func (s *StateImpl) IsActive() bool {
+ val := s.Value()
+ return val == StateWorking || val == StateReady
+}
+
+// change StateImpl value (status)
+func (s *StateImpl) Set(value int64) {
+ atomic.StoreInt64(&s.value, value)
+}
+
+// register new execution atomically
+func (s *StateImpl) RegisterExec() {
+ atomic.AddUint64(&s.numExecs, 1)
+}
+
+// Update last used time
+func (s *StateImpl) SetLastUsed(lu uint64) {
+ atomic.StoreUint64(&s.lastUsed, lu)
+}
+
+func (s *StateImpl) LastUsed() uint64 {
+ return atomic.LoadUint64(&s.lastUsed)
+}
diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go
new file mode 100755
index 00000000..c67182d6
--- /dev/null
+++ b/pkg/worker/state_test.go
@@ -0,0 +1,27 @@
+package worker
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 010af076..82a5462a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,41 +6,26 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
// Allocator is responsible for worker allocation in the pool
-type Allocator func() (*SyncWorkerImpl, error)
+type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
+func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
}
}
-// FromSync creates BaseProcess from SyncWorkerImpl
-func FromSync(w *SyncWorkerImpl) BaseProcess {
- return &Process{
- created: w.process.created,
- events: w.process.events,
- state: w.process.state,
- cmd: w.process.cmd,
- pid: w.process.pid,
- endState: w.process.endState,
- relay: w.process.relay,
- }
-}
-
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
@@ -48,25 +33,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -91,7 +76,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -101,13 +86,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -117,7 +102,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
c <- wexec{
@@ -214,7 +199,7 @@ func (tw *SyncWorkerImpl) Created() time.Time {
return tw.process.Created()
}
-func (tw *SyncWorkerImpl) State() internal.State {
+func (tw *SyncWorkerImpl) State() State {
return tw.process.State()
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b726c6f1..0f7ab755 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -9,22 +9,12 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 25 * time.Millisecond
-
- // ReadBufSize used to make a slice with specified length to read from stderr
- ReadBufSize = 10240 // Kb
-)
-
type Options func(p *Process)
// Process - supervised process with api over goridge.Relay.
@@ -39,7 +29,7 @@ type Process struct {
// number of Process executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
- state *internal.WorkerState
+ state *StateImpl
// underlying command with associated process, command must be
// provided to Process from outside in non-started form. CmdSource
@@ -50,9 +40,6 @@ type Process struct {
// can be nil while process is not started.
pid int
- // contains information about resulted process state.
- endState *os.ProcessState
-
// communication bus with underlying process.
relay relay.Relay
}
@@ -67,7 +54,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(states.StateInactive),
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -106,7 +93,7 @@ func (w *Process) addListener(listener events.Listener) {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) State() internal.State {
+func (w *Process) State() State {
return w.state
}
@@ -157,13 +144,13 @@ func (w *Process) Wait() error {
err = w.cmd.Wait()
// If worker was destroyed, just exit
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
return nil
}
// If state is different, and err is not nil, append it to the errors
if err != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
err = multierr.Combine(err, errors.E(op, err))
}
@@ -173,12 +160,12 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
err2 := w.closeRelay()
if err2 != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
return multierr.Append(err, errors.E(op, err2))
}
if w.cmd.ProcessState.Success() {
- w.State().Set(states.StateStopped)
+ w.State().Set(StateStopped)
return nil
}
@@ -198,20 +185,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(states.StateStopping)
+ w.state.Set(StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(states.StateKilling)
- return multierr.Append(err, w.cmd.Process.Kill())
+ w.state.Set(StateKilling)
+ return multierr.Append(err, w.cmd.Process.Signal(os.Kill))
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -219,12 +206,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(states.StateKilling)
+ w.state.Set(StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
new file mode 100644
index 00000000..532bace9
--- /dev/null
+++ b/pkg/worker_watcher/container/interface.go
@@ -0,0 +1,13 @@
+package container
+
+import "github.com/spiral/roadrunner/v2/pkg/worker"
+
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue() (worker.BaseProcess, bool)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go
index 55034e41..fb8ecd3b 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/container/stack.go
@@ -1,17 +1,17 @@
-package worker_watcher //nolint:golint,stylecheck
+package container
+
import (
"context"
"runtime"
"sync"
"time"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []*worker.SyncWorkerImpl
- mutex sync.RWMutex
+ sync.RWMutex
+ workers []worker.BaseProcess
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
@@ -20,39 +20,39 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]*worker.SyncWorkerImpl, 0, w),
+ workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
-// Push worker back to the stack
-// If stack in destroy state, Push will provide 100ms window to unlock the mutex
+// Push worker back to the vec
+// If vec in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
+ stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+ stack.Lock()
+ defer stack.Unlock()
- // do not release new stack
+ // do not release new vec
if stack.destroy {
return nil, true
}
@@ -69,10 +69,10 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
- // worker in the stack, reallocating
+ // worker in the vec, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
@@ -84,12 +84,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
return false
}
-// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.SyncWorker {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
- workersCopy := make([]worker.SyncWorker, 0, 1)
+// Workers return copy of the workers in the vec
+func (stack *Stack) Workers() []worker.BaseProcess {
+ stack.Lock()
+ defer stack.Unlock()
+ workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
+ // TODO pointers, copy have no sense
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
@@ -100,40 +101,40 @@ func (stack *Stack) Workers() []worker.SyncWorker {
}
func (stack *Stack) isDestroying() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(ctx context.Context) {
- stack.mutex.Lock()
+func (stack *Stack) Destroy(_ context.Context) {
+ stack.Lock()
stack.destroy = true
- stack.mutex.Unlock()
+ stack.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
- stack.mutex.Lock()
+ stack.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.mutex.Unlock()
+ stack.Unlock()
continue
}
- stack.mutex.Unlock()
+ stack.Unlock()
// unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
+ // just to make sure. All vec at this moment are in the vec
// Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.mutex.Lock()
+ stack.Lock()
for i := 0; i < len(stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(states.StateDestroyed)
+ // set state for the vec in the vec (unused at the moment)
+ stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
- stack.mutex.Unlock()
+ stack.Unlock()
// clear
stack.Reset()
return
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go
index 5287a6dc..d699664c 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/container/stack_test.go
@@ -1,4 +1,5 @@
-package worker_watcher //nolint:golint,stylecheck
+package container
+
import (
"context"
"os/exec"
@@ -12,7 +13,7 @@ import (
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
+ assert.Equal(t, []worker.BaseProcess{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
new file mode 100644
index 00000000..239b01c7
--- /dev/null
+++ b/pkg/worker_watcher/container/vec.go
@@ -0,0 +1,45 @@
+package container
+
+import (
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) Vector {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Enqueue(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, true
+ }
+
+ w := <-v.workers
+
+ return w, false
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 927aa270..4625b7a7 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -6,25 +6,26 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
+// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
- // AddToWatch used to add stack to wait its state
- AddToWatch(workers []worker.SyncWorker) error
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
- // GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (worker.SyncWorker, error)
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
- // PutWorker enqueues worker back
- PushWorker(w worker.SyncWorker)
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
- // AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew() error
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
- // Destroy destroys the underlying stack
+ // Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all stack w/o removing it from internal storage
- WorkersList() []worker.SyncWorker
+ // WorkersList return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
- // RemoveWorker remove worker from the stack
- RemoveWorker(wb worker.SyncWorker) error
+ // RemoveWorker remove worker from the container
+ Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cf2e1eb7..804e4658 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,132 +3,228 @@ package worker_watcher //nolint:golint,stylecheck
import (
"context"
"sync"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+// workerCreateFunc can be nil, but in that case, dead container will not be replaced
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(numWorkers),
- allocator: allocator,
- events: events,
+ container: container.NewVector(numWorkers),
+ numWorkers: numWorkers,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+ allocator: allocator,
+ events: events,
}
return ww
}
type workerWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator worker.Allocator
- events events.Handler
+ sync.RWMutex
+ container container.Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+ workers []worker.BaseProcess
+ allocator worker.Allocator
+ events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.stack.Push(workers[i])
+ ww.container.Enqueue(workers[i])
+ // add worker to watch slice
+ ww.workers = append(ww.workers, workers[i])
- go func(swc worker.SyncWorker) {
+ go func(swc worker.BaseProcess) {
ww.wait(swc)
}(workers[i])
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
+// return value from Get
+type get struct {
+ w worker.BaseProcess
+ err error
+}
+
+// Get is not a thread safe operation
+func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+ c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- // thread safe operation
- w, stop := ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
+ go func() {
+ // FAST PATH
+ // thread safe operation
+ w, stop := ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
+ return
+ }
- // handle worker remove state
- // in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == states.StateRemove {
- err := ww.RemoveWorker(w)
- if err != nil {
- return nil, err
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ c <- get{
+ w,
+ nil,
+ }
+ return
}
- // try to get next
- return ww.GetFreeWorker(ctx)
- }
- // no free stack
- if w == nil {
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill()
+ // no free workers in the container
+ // try to continuously get free one
for {
select {
default:
- w, stop = ww.stack.Pop()
+ w, stop = ww.container.Dequeue()
if stop {
- return nil, errors.E(op, errors.WatcherStopped)
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
}
- if w == nil {
+
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
+ }
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
continue
}
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
- }
+ }()
- return w, nil
+ select {
+ case r := <-c:
+ if r.err != nil {
+ return nil, r.err
+ }
+ return r.w, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
+ }
}
-func (ww *workerWatcher) AllocateNew() error {
- ww.stack.mutex.Lock()
+func (ww *workerWatcher) Allocate() error {
+ ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
}
+ // add worker to Wait
ww.addToWatch(sw)
- ww.stack.mutex.Unlock()
- ww.PushWorker(sw)
+ // add new worker to the workers slice (to get information about workers in parallel)
+ ww.workers = append(ww.workers, sw)
+
+ // unlock Allocate mutex
+ ww.Unlock()
+ // push the worker to the container
+ ww.Push(sw)
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
+// Remove
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
+ ww.Lock()
+ defer ww.Unlock()
- const op = errors.Op("worker_watcher_remove_worker")
+ // set remove state
pid := wb.Pid()
- if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(states.StateRemove)
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
+ // worker will be removed on the Get operation
+ for i := 0; i < len(ww.workers); i++ {
+ if ww.workers[i].Pid() == pid {
+ ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
+ // kill worker
+ _ = wb.Kill()
+ return
}
- return nil
}
-
- return nil
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
- ww.stack.Push(w)
+func (ww *workerWatcher) Push(w worker.BaseProcess) {
+ ww.container.Enqueue(w)
}
-// Destroy all underlying stack (but let them to complete the task)
+// Destroy all underlying container (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- // destroy stack, we don't use ww mutex here, since we should be able to push worker
- ww.stack.Destroy(ctx)
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.Lock()
+ // that might be one of the workers is working
+ if ww.numWorkers != uint64(len(ww.workers)) {
+ ww.Unlock()
+ continue
+ }
+ ww.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+ return
+ }
+ }
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
- return ww.stack.Workers()
+func (ww *workerWatcher) List() []worker.BaseProcess {
+ ww.Lock()
+ defer ww.Unlock()
+
+ base := make([]worker.BaseProcess, 0, len(ww.workers))
+ for i := 0; i < len(ww.workers); i++ {
+ base = append(base, ww.workers[i])
+ }
+
+ return base
}
func (ww *workerWatcher) wait(w worker.BaseProcess) {
@@ -142,14 +238,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
- _ = ww.stack.FindAndRemoveByPid(w.Pid())
- err = ww.AllocateNew()
+ ww.Remove(w)
+ err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
Event: events.EventPoolError,
@@ -158,7 +254,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
go func() {
ww.wait(wb)
}()