diff options
Diffstat (limited to 'pkg')
22 files changed, 575 insertions, 285 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index fac36852..bf3972aa 100755 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -1,5 +1,7 @@ package payload +import "unsafe" + // Payload carries binary header and body to stack and // back to the server. type Payload struct { @@ -12,5 +14,10 @@ type Payload struct { // String returns payload body as string func (p *Payload) String() string { - return string(p.Body) + return toString(p.Body) +} + +// unsafe, but lightning fast []byte to string conversion +func toString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) } diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4f7ae595..bfc56c3f 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -19,10 +19,10 @@ type Pool interface { ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []worker.SyncWorker) + Workers() (workers []worker.BaseProcess) // Remove worker from the pool. - RemoveWorker(worker worker.SyncWorker) error + RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 01b0574d..f1b20bb9 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -4,11 +4,11 @@ import ( "context" "os/exec" "time" + "unsafe" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) type Options func(p *StaticPool) @@ -46,8 +46,8 @@ type StaticPool struct { // allocate new worker allocator worker.Allocator - // errEncoder is the default Exec error encoder - errEncoder ErrorEncoder + // err_encoder is the default Exec error encoder + err_encoder ErrorEncoder //nolint } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -75,21 +75,24 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg options[i](p) } + // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + // set up workers watcher p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } - // put stack in the pool - err = p.ww.AddToWatch(workers) + // add workers to the watcher + err = p.ww.Watch(workers) if err != nil { return nil, errors.E(op, err) } - p.errEncoder = defaultErrEncoder(p) + p.err_encoder = defaultErrEncoder(p) // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { @@ -122,12 +125,13 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { - return sp.ww.WorkersList() +func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { + return sp.ww.List() } -func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { - return sp.ww.RemoveWorker(wb) +func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { + sp.ww.Remove(wb) + return nil } // Be careful, sync Exec with ExecWithContext @@ -143,16 +147,15 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, err) } - rsp, err := w.Exec(p) + rsp, err := w.(worker.SyncWorker).Exec(p) if err != nil { - return sp.errEncoder(err, w) + return sp.err_encoder(err, w) } // worker want's to be terminated - // TODO careful with string(rsp.Context) - if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + // TODO careful with toString(rsp.Context) + if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest { sp.stopWorker(w) - return sp.Exec(p) } @@ -174,13 +177,13 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return payload.Payload{}, errors.E(op, err) } - rsp, err := w.ExecWithTimeout(ctx, p) + rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p) if err != nil { - return sp.errEncoder(err, w) + return sp.err_encoder(err, w) } // worker want's to be terminated - if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest { sp.stopWorker(w) return sp.ExecWithContext(ctx, p) } @@ -193,9 +196,9 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return rsp, nil } -func (sp *StaticPool) stopWorker(w worker.SyncWorker) { +func (sp *StaticPool) stopWorker(w worker.BaseProcess) { const op = errors.Op("static_pool_stop_worker") - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -203,22 +206,24 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) { } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { +func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error { const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew() + w.State().Set(worker.StateDestroyed) + sp.ww.Remove(w) + err := sp.ww.Allocate() if err != nil { return errors.E(op, err) } } else { - sp.ww.PushWorker(w) + sp.ww.Push(w) } return nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { - // GetFreeWorker function consumes context with timeout - w, err := sp.ww.GetFreeWorker(ctxGetFree) +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { + // Get function consumes context with timeout + w, err := sp.ww.Get(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -237,33 +242,32 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.SyncWorker) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error - if errors.Is(errors.ExecTTL, err) { + + switch { + case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) - } - // soft job errors are allowed - if errors.Is(errors.SoftJob, err) { + + case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew() + err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { - sp.ww.PushWorker(w) + sp.ww.Push(w) } - - return payload.Payload{}, errors.E(op, err) } - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() @@ -276,7 +280,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (*worker.SyncWorkerImpl, error) { + return func() (worker.SyncWorker, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) @@ -310,9 +314,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") - var workers []worker.SyncWorker + var workers []worker.BaseProcess // constant number of stack simplify logic for i := uint64(0); i < numWorkers; i++ { @@ -325,3 +329,8 @@ func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, e } return workers, nil } + +// unsafe, but lightning fast []byte to string conversion +func toString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) +} diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 4cfd5ec6..44f5936c 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) { cfg, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) res, err := p.Exec(payload.Payload{Body: []byte("hello")}) @@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) { } assert.Contains(t, err.Error(), "hello") + p.Destroy(ctx) } func Test_StaticPool_Broken_Replace(t *testing.T) { @@ -255,7 +254,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { list := p.Workers() for _, w := range list { - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) } } @@ -454,7 +453,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { pipe.NewPipeFactory(), Config{ NumWorkers: 5, - AllocateTimeout: time.Second, + AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, ) @@ -462,11 +461,11 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) for i := range p.Workers() { - p.Workers()[i].State().Set(states.StateErrored) + p.Workers()[i].State().Set(worker.StateErrored) } _, err = p.Exec(payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) + assert.NoError(t, err) p.Destroy(ctx) } @@ -645,3 +644,34 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { } } } + +// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op +// inline BenchmarkToStringUnsafe-12 1000000000 0.295 ns/op 0 B/op 0 allocs/op +func BenchmarkToStringUnsafe(b *testing.B) { + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res := toString(testPayload) + _ = res + } +} + +// BenchmarkToStringSafe-12 28584489 39.1 ns/op 112 B/op 1 allocs/op +// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op +func BenchmarkToStringSafe(b *testing.B) { + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res := toStringNotFun(testPayload) + _ = res + } +} + +func toStringNotFun(data []byte) string { + return string(data) +} diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 3347ecd4..3618786d 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -100,13 +99,13 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []worker.SyncWorker) { +func (sp *supervised) Workers() (workers []worker.BaseProcess) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { return sp.pool.RemoveWorker(worker) } @@ -144,7 +143,7 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == states.StateInvalid { + if workers[i].State().Value() == worker.StateInvalid { continue } @@ -177,7 +176,7 @@ func (sp *supervised) control() { // firs we check maxWorker idle if sp.cfg.IdleTTL != 0 { // then check for the worker state - if workers[i].State().Value() != states.StateReady { + if workers[i].State().Value() != worker.StateReady { continue } diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go deleted file mode 100644 index fe653cb4..00000000 --- a/pkg/states/worker_states.go +++ /dev/null @@ -1,33 +0,0 @@ -package states - -const ( - // StateInactive - no associated process - StateInactive int64 = iota - - // StateReady - ready for job. - StateReady - - // StateWorking - working on given payload. - StateWorking - - // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. - StateInvalid - - // StateStopping - process is being softly stopped. - StateStopping - - // StateKilling - process is being forcibly stopped - StateKilling - - // State of worker, when no need to allocate new one - StateDestroyed - - // StateStopped - process has been terminated. - StateStopped - - // StateErrored - error WorkerState (can't be used). - StateErrored - - // StateRemove - worker is killed and removed from the stack - StateRemove -) diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 929043a7..b12ff36a 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -93,7 +92,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } // everything ok, set ready state - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) // return worker c <- SpawnResult{ @@ -153,7 +152,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // everything ok, set ready state - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) return w, nil } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 73008471..a00b2117 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -21,13 +20,13 @@ func Test_GetState2(t *testing.T) { w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, states.StateStopped, w.State().Value()) + assert.Equal(t, worker.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) assert.NoError(t, w.Stop()) } @@ -40,13 +39,13 @@ func Test_Kill2(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, states.StateErrored, w.State().Value()) + assert.Equal(t, worker.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 3efeb59c..fb40ecb0 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -11,7 +11,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -23,13 +22,13 @@ func Test_GetState(t *testing.T) { w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, states.StateStopped, w.State().Value()) + assert.Equal(t, worker.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -46,13 +45,13 @@ func Test_Kill(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, states.StateErrored, w.State().Value()) + assert.Equal(t, worker.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 98bd2389..f58f9561 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -9,11 +9,10 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" - "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" @@ -125,7 +124,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } w.AttachRelay(rl) - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) c <- socketSpawn{ w: w, @@ -168,7 +167,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) return w, nil } diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index c13a897b..a8dd0fe0 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -401,7 +401,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - block := make(chan struct{}) + block := make(chan struct{}, 10) listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { if wev.Event == events.EventWorkerStderr { diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 9d74ae10..96eb25bc 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -5,11 +5,29 @@ import ( "fmt" "time" - "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/pkg/payload" ) +// State represents WorkerProcess status and updated time. +type State interface { + fmt.Stringer + // Value returns StateImpl value + Value() int64 + // Set sets the StateImpl + Set(value int64) + // NumJobs shows how many times WorkerProcess was invoked + NumExecs() uint64 + // IsActive returns true if WorkerProcess not Inactive or Stopped + IsActive() bool + // RegisterExec using to registering php executions + RegisterExec() + // SetLastUsed sets worker last used time + SetLastUsed(lu uint64) + // LastUsed return worker last used time + LastUsed() uint64 +} + type BaseProcess interface { fmt.Stringer @@ -21,7 +39,7 @@ type BaseProcess interface { // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() internal.State + State() State // Start used to run Cmd and immediately return Start() error diff --git a/pkg/worker/state.go b/pkg/worker/state.go new file mode 100755 index 00000000..176e151b --- /dev/null +++ b/pkg/worker/state.go @@ -0,0 +1,108 @@ +package worker + +import ( + "sync/atomic" +) + +// SYNC WITH worker_watcher.GET +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + // StateKilling - process is being forcibly stopped + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error StateImpl (can't be used). + StateErrored +) + +type StateImpl struct { + value int64 + numExecs uint64 + // to be lightweight, use UnixNano + lastUsed uint64 +} + +// Thread safe +func NewWorkerState(value int64) *StateImpl { + return &StateImpl{value: value} +} + +// String returns current StateImpl as string. +func (s *StateImpl) String() string { + switch s.Value() { + case StateInactive: + return "inactive" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateInvalid: + return "invalid" + case StateStopping: + return "stopping" + case StateStopped: + return "stopped" + case StateKilling: + return "killing" + case StateErrored: + return "errored" + case StateDestroyed: + return "destroyed" + } + + return "undefined" +} + +// NumExecs returns number of registered WorkerProcess execs. +func (s *StateImpl) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) +} + +// Value StateImpl returns StateImpl value +func (s *StateImpl) Value() int64 { + return atomic.LoadInt64(&s.value) +} + +// IsActive returns true if WorkerProcess not Inactive or Stopped +func (s *StateImpl) IsActive() bool { + val := s.Value() + return val == StateWorking || val == StateReady +} + +// change StateImpl value (status) +func (s *StateImpl) Set(value int64) { + atomic.StoreInt64(&s.value, value) +} + +// register new execution atomically +func (s *StateImpl) RegisterExec() { + atomic.AddUint64(&s.numExecs, 1) +} + +// Update last used time +func (s *StateImpl) SetLastUsed(lu uint64) { + atomic.StoreUint64(&s.lastUsed, lu) +} + +func (s *StateImpl) LastUsed() uint64 { + return atomic.LoadUint64(&s.lastUsed) +} diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go new file mode 100755 index 00000000..c67182d6 --- /dev/null +++ b/pkg/worker/state_test.go @@ -0,0 +1,27 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 010af076..82a5462a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,41 +6,26 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) // Allocator is responsible for worker allocation in the pool -type Allocator func() (*SyncWorkerImpl, error) +type Allocator func() (SyncWorker, error) type SyncWorkerImpl struct { process *Process } // From creates SyncWorker from BaseProcess -func From(process *Process) *SyncWorkerImpl { +func From(process *Process) SyncWorker { return &SyncWorkerImpl{ process: process, } } -// FromSync creates BaseProcess from SyncWorkerImpl -func FromSync(w *SyncWorkerImpl) BaseProcess { - return &Process{ - created: w.process.created, - events: w.process.events, - state: w.process.state, - cmd: w.process.cmd, - pid: w.process.pid, - endState: w.process.endState, - relay: w.process.relay, - } -} - // Exec payload without TTL timeout. func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") @@ -48,25 +33,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -91,7 +76,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -101,13 +86,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -117,7 +102,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() c <- wexec{ @@ -214,7 +199,7 @@ func (tw *SyncWorkerImpl) Created() time.Time { return tw.process.Created() } -func (tw *SyncWorkerImpl) State() internal.State { +func (tw *SyncWorkerImpl) State() State { return tw.process.State() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index b726c6f1..0f7ab755 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -9,22 +9,12 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 25 * time.Millisecond - - // ReadBufSize used to make a slice with specified length to read from stderr - ReadBufSize = 10240 // Kb -) - type Options func(p *Process) // Process - supervised process with api over goridge.Relay. @@ -39,7 +29,7 @@ type Process struct { // number of Process executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. - state *internal.WorkerState + state *StateImpl // underlying command with associated process, command must be // provided to Process from outside in non-started form. CmdSource @@ -50,9 +40,6 @@ type Process struct { // can be nil while process is not started. pid int - // contains information about resulted process state. - endState *os.ProcessState - // communication bus with underlying process. relay relay.Relay } @@ -67,7 +54,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(states.StateInactive), + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -106,7 +93,7 @@ func (w *Process) addListener(listener events.Listener) { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) State() internal.State { +func (w *Process) State() State { return w.state } @@ -157,13 +144,13 @@ func (w *Process) Wait() error { err = w.cmd.Wait() // If worker was destroyed, just exit - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { return nil } // If state is different, and err is not nil, append it to the errors if err != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) err = multierr.Combine(err, errors.E(op, err)) } @@ -173,12 +160,12 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error err2 := w.closeRelay() if err2 != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) return multierr.Append(err, errors.E(op, err2)) } if w.cmd.ProcessState.Success() { - w.State().Set(states.StateStopped) + w.State().Set(StateStopped) return nil } @@ -198,20 +185,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(states.StateStopping) + w.state.Set(StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(states.StateKilling) - return multierr.Append(err, w.cmd.Process.Kill()) + w.state.Set(StateKilling) + return multierr.Append(err, w.cmd.Process.Signal(os.Kill)) } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -219,12 +206,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(states.StateKilling) + w.state.Set(StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go new file mode 100644 index 00000000..532bace9 --- /dev/null +++ b/pkg/worker_watcher/container/interface.go @@ -0,0 +1,13 @@ +package container + +import "github.com/spiral/roadrunner/v2/pkg/worker" + +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue() (worker.BaseProcess, bool) + // Destroy used to stop releasing the workers + Destroy() +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go index 55034e41..fb8ecd3b 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/container/stack.go @@ -1,17 +1,17 @@ -package worker_watcher //nolint:golint,stylecheck +package container + import ( "context" "runtime" "sync" "time" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []*worker.SyncWorkerImpl - mutex sync.RWMutex + sync.RWMutex + workers []worker.BaseProcess destroy bool actualNumOfWorkers uint64 initialNumOfWorkers uint64 @@ -20,39 +20,39 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]*worker.SyncWorkerImpl, 0, w), + workers: make([]worker.BaseProcess, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } } func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers = 0 stack.workers = nil } -// Push worker back to the stack -// If stack in destroy state, Push will provide 100ms window to unlock the mutex +// Push worker back to the vec +// If vec in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) + stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return len(stack.workers) == 0 } -func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.Lock() + defer stack.Unlock() - // do not release new stack + // do not release new vec if stack.destroy { return nil, true } @@ -69,10 +69,10 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { } func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { - // worker in the stack, reallocating + // worker in the vec, reallocating if stack.workers[i].Pid() == pid { stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) stack.actualNumOfWorkers-- @@ -84,12 +84,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { return false } -// Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.SyncWorker { - stack.mutex.Lock() - defer stack.mutex.Unlock() - workersCopy := make([]worker.SyncWorker, 0, 1) +// Workers return copy of the workers in the vec +func (stack *Stack) Workers() []worker.BaseProcess { + stack.Lock() + defer stack.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) // copy + // TODO pointers, copy have no sense for _, v := range stack.workers { if v != nil { workersCopy = append(workersCopy, v) @@ -100,40 +101,40 @@ func (stack *Stack) Workers() []worker.SyncWorker { } func (stack *Stack) isDestroying() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return stack.destroy } // we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(ctx context.Context) { - stack.mutex.Lock() +func (stack *Stack) Destroy(_ context.Context) { + stack.Lock() stack.destroy = true - stack.mutex.Unlock() + stack.Unlock() tt := time.NewTicker(time.Millisecond * 500) defer tt.Stop() for { select { case <-tt.C: - stack.mutex.Lock() + stack.Lock() // that might be one of the workers is working if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.mutex.Unlock() + stack.Unlock() continue } - stack.mutex.Unlock() + stack.Unlock() // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack + // just to make sure. All vec at this moment are in the vec // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.mutex.Lock() + stack.Lock() for i := 0; i < len(stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(states.StateDestroyed) + // set state for the vec in the vec (unused at the moment) + stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } - stack.mutex.Unlock() + stack.Unlock() // clear stack.Reset() return diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go index 5287a6dc..d699664c 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/container/stack_test.go @@ -1,4 +1,5 @@ -package worker_watcher //nolint:golint,stylecheck +package container + import ( "context" "os/exec" @@ -12,7 +13,7 @@ import ( func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) + assert.Equal(t, []worker.BaseProcess{}, stack.workers) } func TestStack_Push(t *testing.T) { diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go new file mode 100644 index 00000000..239b01c7 --- /dev/null +++ b/pkg/worker_watcher/container/vec.go @@ -0,0 +1,45 @@ +package container + +import ( + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + destroy uint64 + workers chan worker.BaseProcess +} + +func NewVector(initialNumOfWorkers uint64) Vector { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, initialNumOfWorkers), + } + + return vec +} + +func (v *Vec) Enqueue(w worker.BaseProcess) { + v.workers <- w +} + +func (v *Vec) Dequeue() (worker.BaseProcess, bool) { + /* + if *addr == old { + *addr = new + return true + } + */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, true + } + + w := <-v.workers + + return w, false +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 927aa270..4625b7a7 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -6,25 +6,26 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker" ) +// Watcher is an interface for the Sync workers lifecycle type Watcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(workers []worker.SyncWorker) error + // Watch used to add workers to the container + Watch(workers []worker.BaseProcess) error - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) + // Get provide first free worker + Get(ctx context.Context) (worker.BaseProcess, error) - // PutWorker enqueues worker back - PushWorker(w worker.SyncWorker) + // Push enqueues worker back + Push(w worker.BaseProcess) - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew() error + // Allocate - allocates new worker and put it into the WorkerWatcher + Allocate() error - // Destroy destroys the underlying stack + // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all stack w/o removing it from internal storage - WorkersList() []worker.SyncWorker + // WorkersList return all container w/o removing it from internal storage + List() []worker.BaseProcess - // RemoveWorker remove worker from the stack - RemoveWorker(wb worker.SyncWorker) error + // RemoveWorker remove worker from the container + Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cf2e1eb7..804e4658 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,132 +3,228 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) - go func(swc worker.SyncWorker) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + +// Get is not a thread safe operation +func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + return + } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == states.StateRemove { - err := ww.RemoveWorker(w) - if err != nil { - return nil, err + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, + } + return } - // try to get next - return ww.GetFreeWorker(ctx) - } - // no free stack - if w == nil { + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one for { select { default: - w, stop = ww.stack.Pop() + w, stop = ww.container.Dequeue() if stop { - return nil, errors.E(op, errors.WatcherStopped) + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } } - if w == nil { + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker continue } - return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - } + }() - return w, nil + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) + } } -func (ww *workerWatcher) AllocateNew() error { - ww.stack.mutex.Lock() +func (ww *workerWatcher) Allocate() error { + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { return errors.E(op, errors.WorkerAllocate, err) } + // add worker to Wait ww.addToWatch(sw) - ww.stack.mutex.Unlock() - ww.PushWorker(sw) + // add new worker to the workers slice (to get information about workers in parallel) + ww.workers = append(ww.workers, sw) + + // unlock Allocate mutex + ww.Unlock() + // push the worker to the container + ww.Push(sw) return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +// Remove +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") + // set remove state pid := wb.Pid() - if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(states.StateRemove) - err := wb.Kill() - if err != nil { - return errors.E(op, err) + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) +func (ww *workerWatcher) Push(w worker.BaseProcess) { + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.SyncWorker { - return ww.stack.Workers() +func (ww *workerWatcher) List() []worker.BaseProcess { + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -142,14 +238,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) - err = ww.AllocateNew() + ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, @@ -158,7 +254,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { go func() { ww.wait(wb) }() |