summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/interface.go4
-rwxr-xr-xpkg/pool/static_pool.go91
-rwxr-xr-xpkg/pool/static_pool_test.go44
-rwxr-xr-xpkg/pool/supervisor_pool.go9
4 files changed, 93 insertions, 55 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4f7ae595..bfc56c3f 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -19,10 +19,10 @@ type Pool interface {
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []worker.SyncWorker)
+ Workers() (workers []worker.BaseProcess)
// Remove worker from the pool.
- RemoveWorker(worker worker.SyncWorker) error
+ RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 01b0574d..f1b20bb9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -4,11 +4,11 @@ import (
"context"
"os/exec"
"time"
+ "unsafe"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -46,8 +46,8 @@ type StaticPool struct {
// allocate new worker
allocator worker.Allocator
- // errEncoder is the default Exec error encoder
- errEncoder ErrorEncoder
+ // err_encoder is the default Exec error encoder
+ err_encoder ErrorEncoder //nolint
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -75,21 +75,24 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ // set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ // allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
- // put stack in the pool
- err = p.ww.AddToWatch(workers)
+ // add workers to the watcher
+ err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
}
- p.errEncoder = defaultErrEncoder(p)
+ p.err_encoder = defaultErrEncoder(p)
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
@@ -122,12 +125,13 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
- return sp.ww.WorkersList()
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+ return sp.ww.List()
}
-func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
- return sp.ww.RemoveWorker(wb)
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+ sp.ww.Remove(wb)
+ return nil
}
// Be careful, sync Exec with ExecWithContext
@@ -143,16 +147,15 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.Exec(p)
+ rsp, err := w.(worker.SyncWorker).Exec(p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
- // TODO careful with string(rsp.Context)
- if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ // TODO careful with toString(rsp.Context)
+ if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
sp.stopWorker(w)
-
return sp.Exec(p)
}
@@ -174,13 +177,13 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, p)
+ rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
- if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
sp.stopWorker(w)
return sp.ExecWithContext(ctx, p)
}
@@ -193,9 +196,9 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -203,22 +206,24 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew()
+ w.State().Set(worker.StateDestroyed)
+ sp.ww.Remove(w)
+ err := sp.ww.Allocate()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
- // GetFreeWorker function consumes context with timeout
- w, err := sp.ww.GetFreeWorker(ctxGetFree)
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+ // Get function consumes context with timeout
+ w, err := sp.ww.Get(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -237,33 +242,32 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.SyncWorker) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
- if errors.Is(errors.ExecTTL, err) {
+
+ switch {
+ case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- }
- // soft job errors are allowed
- if errors.Is(errors.SoftJob, err) {
+
+ case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
-
- return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
@@ -276,7 +280,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (*worker.SyncWorkerImpl, error) {
+ return func() (worker.SyncWorker, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -310,9 +314,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.SyncWorker
+ var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
@@ -325,3 +329,8 @@ func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, e
}
return workers, nil
}
+
+// unsafe, but lightning fast []byte to string conversion
+func toString(data []byte) string {
+ return *(*string)(unsafe.Pointer(&data))
+}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 4cfd5ec6..44f5936c 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) {
cfg,
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
res, err := p.Exec(payload.Payload{Body: []byte("hello")})
@@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) {
}
assert.Contains(t, err.Error(), "hello")
+ p.Destroy(ctx)
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
@@ -255,7 +254,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
}
}
@@ -454,7 +453,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
pipe.NewPipeFactory(),
Config{
NumWorkers: 5,
- AllocateTimeout: time.Second,
+ AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
@@ -462,11 +461,11 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for i := range p.Workers() {
- p.Workers()[i].State().Set(states.StateErrored)
+ p.Workers()[i].State().Set(worker.StateErrored)
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
+ assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -645,3 +644,34 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
}
}
}
+
+// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
+// inline BenchmarkToStringUnsafe-12 1000000000 0.295 ns/op 0 B/op 0 allocs/op
+func BenchmarkToStringUnsafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := toString(testPayload)
+ _ = res
+ }
+}
+
+// BenchmarkToStringSafe-12 28584489 39.1 ns/op 112 B/op 1 allocs/op
+// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
+func BenchmarkToStringSafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := toStringNotFun(testPayload)
+ _ = res
+ }
+}
+
+func toStringNotFun(data []byte) string {
+ return string(data)
+}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 3347ecd4..3618786d 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -100,13 +99,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.SyncWorker) {
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
return sp.pool.RemoveWorker(worker)
}
@@ -144,7 +143,7 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == states.StateInvalid {
+ if workers[i].State().Value() == worker.StateInvalid {
continue
}
@@ -177,7 +176,7 @@ func (sp *supervised) control() {
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != states.StateReady {
+ if workers[i].State().Value() != worker.StateReady {
continue
}