summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/events/general.go2
-rw-r--r--pkg/events/interface.go4
-rw-r--r--pkg/events/jobs_events.go88
-rw-r--r--pkg/events/pool_events.go2
-rw-r--r--pkg/events/worker_events.go2
-rw-r--r--pkg/pool/config.go2
-rw-r--r--pkg/pool/interface.go12
-rwxr-xr-xpkg/pool/static_pool.go63
-rwxr-xr-xpkg/pool/static_pool_test.go124
-rwxr-xr-xpkg/pool/supervisor_pool.go14
-rw-r--r--pkg/pool/supervisor_test.go37
-rw-r--r--pkg/priority_queue/binary_heap.go125
-rw-r--r--pkg/priority_queue/binary_heap_test.go128
-rw-r--r--pkg/priority_queue/interface.go31
-rw-r--r--pkg/process/state.go8
-rw-r--r--pkg/pubsub/interface.go54
-rw-r--r--pkg/pubsub/psmessage.go15
-rw-r--r--pkg/transport/interface.go2
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go77
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go38
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go39
-rwxr-xr-xpkg/transport/socket/socket_factory.go57
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go32
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go38
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/sync_worker.go59
-rwxr-xr-xpkg/worker/sync_worker_test.go5
-rw-r--r--pkg/worker_handler/request.go8
-rw-r--r--pkg/worker_handler/response.go2
-rw-r--r--pkg/worker_watcher/container/channel/vec.go99
-rw-r--r--pkg/worker_watcher/container/interface.go17
-rw-r--r--pkg/worker_watcher/container/queue/queue.go102
-rw-r--r--pkg/worker_watcher/container/vec.go51
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
34 files changed, 986 insertions, 433 deletions
diff --git a/pkg/events/general.go b/pkg/events/general.go
index a09a8759..5cf13e10 100755
--- a/pkg/events/general.go
+++ b/pkg/events/general.go
@@ -4,6 +4,8 @@ import (
"sync"
)
+const UnknownEventType string = "Unknown event type"
+
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
listeners []Listener
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
index ac6c15a4..7d57e4d0 100644
--- a/pkg/events/interface.go
+++ b/pkg/events/interface.go
@@ -2,7 +2,7 @@ package events
// Handler interface
type Handler interface {
- // Return number of active listeners
+ // NumListeners return number of active listeners
NumListeners() int
// AddListener adds lister to the publisher
AddListener(listener Listener)
@@ -10,5 +10,5 @@ type Handler interface {
Push(e interface{})
}
-// Event listener listens for the events produced by worker, worker pool or other service.
+// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
type Listener func(event interface{})
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
new file mode 100644
index 00000000..c0ee733a
--- /dev/null
+++ b/pkg/events/jobs_events.go
@@ -0,0 +1,88 @@
+package events
+
+import (
+ "time"
+)
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK = iota + 12000
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventInitialized when pipeline has been initialized, but not started
+ EventInitialized
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+)
+
+type J int64
+
+func (ev J) String() string {
+ switch ev {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventInitialized:
+ return "EventInitialized"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type JobEvent struct {
+ Event J
+ // String is job id.
+ ID string
+
+ // Pipeline name
+ Pipeline string
+
+ // Associated driver name (amqp, ephemeral, etc)
+ Driver string
+
+ // Error for the jobs/pipes errors
+ Error error
+
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index e7b451e0..4d4cae5d 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -57,7 +57,7 @@ func (ev P) String() string {
case EventPoolRestart:
return "EventPoolRestart"
}
- return "Unknown event type"
+ return UnknownEventType
}
// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 11bd6ab7..39c38e57 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -20,7 +20,7 @@ func (ev W) String() string {
case EventWorkerStderr:
return "EventWorkerStderr"
}
- return "Unknown event type"
+ return UnknownEventType
}
// WorkerEvent wraps worker events.
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index bbf7653e..4049122c 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -13,7 +13,7 @@ type Pool interface {
GetConfig() interface{}
// Exec executes task with payload
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
@@ -25,7 +25,7 @@ type Pool interface {
Destroy(ctx context.Context)
// ExecWithContext executes task with context which is used with timeout
- execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+ execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
}
// Watcher is an interface for the Sync workers lifecycle
@@ -33,11 +33,11 @@ type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
+ // Take takes the first free worker
+ Take(ctx context.Context) (worker.BaseProcess, error)
- // Push enqueues worker back
- Push(w worker.BaseProcess)
+ // Release releases the worker putting it back to the queue
+ Release(w worker.BaseProcess)
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 5a6247b5..051e7a8a 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
type Options func(p *StaticPool)
@@ -26,7 +26,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg Config
+ cfg *Config
// worker command creator
cmd Command
@@ -51,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -135,16 +135,16 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
}
// Exec executes provided payload on the worker
-func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxGetFree, op)
+ w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).Exec(p)
@@ -163,12 +163,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return rsp, nil
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
// Be careful, sync with pool.Exec method
-func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
if sp.cfg.Debug {
return sp.execDebugWithTTL(ctx, p)
@@ -176,9 +176,9 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxAlloc, op)
+ w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
@@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
- sp.ww.Push(w)
+ sp.ww.Release(w)
return
}
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// Get function consumes context with timeout
- w, err := sp.ww.Get(ctxGetFree)
+ w, err := sp.ww.Take(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
@@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // TODO suspicious logic, redesign
err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
@@ -265,7 +266,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
} else {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
}
@@ -273,10 +274,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
if errS != nil {
- return payload.Payload{}, errors.E(op, err, errS)
+ return nil, errors.E(op, err, errS)
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
}
@@ -289,6 +290,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
+ // wrap sync worker
sw := worker.From(w)
sp.events.Push(events.PoolEvent{
@@ -300,26 +302,33 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
}
// execDebug used when debug mode was not set and exec_ttl is 0
-func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
- // redirect call to the workers exec method (without ttl)
+ // redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
- if stopErr := sw.Stop(); stopErr != nil {
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = sw.Stop()
+ if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ return nil, errors.E(op, err)
}
- return r, err
+ return r, nil
}
// execDebugWithTTL used when user set debug mode and exec_ttl
-func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the worker with TTL
@@ -333,7 +342,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("allocate workers")
+ const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6f875072..2ac2093d 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
@@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -204,7 +202,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Consume pool events
+ // Run pool events
ev := make(chan struct{}, 1)
listener := func(event interface{}) {
if pe, ok := event.(events.PoolEvent); ok {
@@ -214,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg2 = Config{
+ var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -264,7 +262,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -283,7 +281,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -320,7 +318,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -360,7 +358,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -400,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -422,7 +420,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 100)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -452,7 +450,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p.Workers()[i].State().Set(worker.StateErrored)
}
- _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -476,7 +474,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -506,7 +504,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: false,
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.NotNil(t, p)
go func() {
- _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
}()
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -539,7 +536,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -556,7 +553,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -567,6 +564,24 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
assert.Nil(t, p)
}
+/* PTR:
+Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op
+
+VAL:
+Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op
+*/
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
@@ -579,23 +594,33 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.Fatal(err)
}
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(pld); err != nil {
b.Fail()
}
}
}
// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -604,12 +629,23 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
assert.NoError(b, err)
defer p.Destroy(ctx)
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(pld); err != nil {
b.Fail()
log.Println(err)
}
@@ -626,7 +662,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -639,7 +675,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 4b990dbe..bdaeade1 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
panic("used to satisfy pool interface")
}
-func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
return res, nil
@@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
- // just to double check
+ // just to double-check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 1cd301ba..0702a71f 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
-var cfgSupervised = Config{
+var cfgSupervised = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -84,7 +84,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
assert.Error(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
+ assert.Empty(t, resp)
time.Sleep(time.Second * 1)
// should be new worker with new pid
@@ -125,7 +124,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
Supervisor: &SupervisorConfig{
WatchTick: 1 * time.Second,
@@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()
- resp, err = p.Exec(payload.Payload{
+ resp, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -177,7 +176,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
}
func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -225,7 +224,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
}
func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -267,7 +266,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -309,7 +308,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
}
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
new file mode 100644
index 00000000..fc043927
--- /dev/null
+++ b/pkg/priority_queue/binary_heap.go
@@ -0,0 +1,125 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type BinHeap struct {
+ items []Item
+ // find a way to use pointer to the raw data
+ len uint64
+ maxLen uint64
+ cond sync.Cond
+}
+
+func NewBinHeap(maxLen uint64) *BinHeap {
+ return &BinHeap{
+ items: make([]Item, 0, 1000),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (bh *BinHeap) fixUp() {
+ k := bh.len - 1
+ p := (k - 1) >> 1 // k-1 / 2
+
+ for k > 0 {
+ cur, par := (bh.items)[k], (bh.items)[p]
+
+ if cur.Priority() < par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) swap(i, j uint64) {
+ (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := (curr << 1) + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if (curr<<1)+2 <= end {
+ cTwoIdx = (curr << 1) + 2
+ }
+
+ idxToSwap := cOneIdx
+ if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
+ bh.swap(uint64(curr), uint64(idxToSwap))
+ curr = idxToSwap
+ cOneIdx = (curr << 1) + 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) Len() uint64 {
+ return atomic.LoadUint64(&bh.len)
+}
+
+func (bh *BinHeap) Insert(item Item) {
+ bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
+ bh.items = append(bh.items, item)
+
+ // add len to the slice
+ atomic.AddUint64(&bh.len, 1)
+
+ // fix binary heap up
+ bh.fixUp()
+ bh.cond.L.Unlock()
+
+ // signal the goroutine on wait
+ bh.cond.Signal()
+}
+
+func (bh *BinHeap) ExtractMin() Item {
+ bh.cond.L.Lock()
+
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
+ bh.cond.Wait()
+ }
+
+ bh.swap(0, bh.len-1)
+
+ item := (bh.items)[int(bh.len)-1]
+ bh.items = (bh).items[0 : int(bh.len)-1]
+ bh.fixDown(0, int(bh.len-2))
+
+ // reduce len
+ atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
+ return item
+}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
new file mode 100644
index 00000000..fb5b83de
--- /dev/null
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -0,0 +1,128 @@
+package priorityqueue
+
+import (
+ "fmt"
+ "math/rand"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+type Test int
+
+func (t Test) Ack() error {
+ return nil
+}
+
+func (t Test) Nack() error {
+ return nil
+}
+
+func (t Test) Requeue(_ map[string][]string, _ int64) error {
+ return nil
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (t Test) ID() string {
+ return "none"
+}
+
+func (t Test) Priority() int64 {
+ return int64(t)
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap(12)
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]Item, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.ExtractMin()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
+func TestNewPriorityQueue(t *testing.T) {
+ insertsPerSec := uint64(0)
+ getPerSec := uint64(0)
+ stopCh := make(chan struct{}, 1)
+ pq := NewBinHeap(1000)
+
+ go func() {
+ tt3 := time.NewTicker(time.Millisecond * 10)
+ for {
+ select {
+ case <-tt3.C:
+ require.Less(t, pq.Len(), uint64(1002))
+ case <-stopCh:
+ return
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Second)
+
+ for {
+ select {
+ case <-tt.C:
+ fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
+ atomic.StoreUint64(&insertsPerSec, 0)
+ fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
+ atomic.StoreUint64(&getPerSec, 0)
+ case <-stopCh:
+ tt.Stop()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.ExtractMin()
+ atomic.AddUint64(&getPerSec, 1)
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.Insert(Test(rand.Int())) //nolint:gosec
+ atomic.AddUint64(&insertsPerSec, 1)
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
new file mode 100644
index 00000000..9efa4652
--- /dev/null
+++ b/pkg/priority_queue/interface.go
@@ -0,0 +1,31 @@
+package priorityqueue
+
+type Queue interface {
+ Insert(item Item)
+ ExtractMin() Item
+ Len() uint64
+}
+
+// Item represents binary heap item
+type Item interface {
+ // ID is a unique item identifier
+ ID() string
+
+ // Priority returns the Item's priority to sort
+ Priority() int64
+
+ // Body is the Item payload
+ Body() []byte
+
+ // Context is the Item meta information
+ Context() ([]byte, error)
+
+ // Ack - acknowledge the Item after processing
+ Ack() error
+
+ // Nack - discard the Item
+ Nack() error
+
+ // Requeue - put the message back to the queue with the optional delay
+ Requeue(headers map[string][]string, delay int64) error
+}
diff --git a/pkg/process/state.go b/pkg/process/state.go
index 652ec77c..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/process/state.go
@@ -32,20 +32,20 @@ type State struct {
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (State, error) {
+func WorkerProcessState(w worker.BaseProcess) (*State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return State{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
percent, err := p.CPUPercent()
if err != nil {
- return State{}, err
+ return nil, err
}
- return State{
+ return &State{
CPUPercent: percent,
Pid: int(w.Pid()),
Status: w.State().String(),
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
deleted file mode 100644
index 06252d70..00000000
--- a/pkg/pubsub/interface.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package pubsub
-
-/*
-This interface is in BETA. It might be changed.
-*/
-
-// PubSub interface designed to implement on any storage type to provide pub-sub abilities
-// Publisher used to receive messages from the PHP app via RPC
-// Subscriber should be implemented to subscribe to a topics and provide a connections list per topic
-// Reader return next message from the channel
-type PubSub interface {
- Publisher
- Subscriber
- Reader
-}
-
-type SubReader interface {
- Subscriber
- Reader
-}
-
-// Subscriber defines the ability to operate as message passing broker.
-// BETA interface
-type Subscriber interface {
- // Subscribe broker to one or multiple topics.
- Subscribe(connectionID string, topics ...string) error
-
- // Unsubscribe from one or multiply topics
- Unsubscribe(connectionID string, topics ...string) error
-
- // Connections returns all connections associated with the particular topic
- Connections(topic string, ret map[string]struct{})
-}
-
-// Publisher publish one or more messages
-// BETA interface
-type Publisher interface {
- // Publish one or multiple Channel.
- Publish(message *Message) error
-
- // PublishAsync publish message and return immediately
- // If error occurred it will be printed into the logger
- PublishAsync(message *Message)
-}
-
-// Reader interface should return next message
-type Reader interface {
- Next() (*Message, error)
-}
-
-// Constructor is a special pub-sub interface made to return a constructed PubSub type
-type Constructor interface {
- PSConstruct(key string) (PubSub, error)
-}
diff --git a/pkg/pubsub/psmessage.go b/pkg/pubsub/psmessage.go
deleted file mode 100644
index e33d9284..00000000
--- a/pkg/pubsub/psmessage.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package pubsub
-
-import json "github.com/json-iterator/go"
-
-// Message represents a single message with payload bound to a particular topic
-type Message struct {
- // Topic (channel in terms of redis)
- Topic string `json:"topic"`
- // Payload (on some decode stages might be represented as base64 string)
- Payload []byte `json:"payload"`
-}
-
-func (m *Message) MarshalBinary() (data []byte, err error) {
- return json.Marshal(m)
-}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
index 7e3e5350..1b072378 100644
--- a/pkg/transport/interface.go
+++ b/pkg/transport/interface.go
@@ -8,7 +8,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
-// Factory is responsible of wrapping given command into tasks WorkerProcess.
+// Factory is responsible for wrapping given command into tasks WorkerProcess.
type Factory interface {
// SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index 19f4f92d..9433a510 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -22,42 +22,54 @@ func NewPipeFactory() *Factory {
return &Factory{}
}
-type SpawnResult struct {
+type sr struct {
w *worker.Process
err error
}
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- c := make(chan SpawnResult)
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+ spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
in, err := cmd.StdoutPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
out, err := cmd.StdinPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
// Init new PIPE relay
@@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
// Start the worker
err = w.Start()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
- // errors bundle
pid, err := internal.FetchPID(relay)
- if pid != w.Pid() || err != nil {
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
w.Wait(),
)
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
- return
}
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
+ if pid != w.Pid() {
+ select {
+ case spCh <- sr{
+ w: nil,
+ err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
+ }
+ }
+ select {
+ case
// return worker
- c <- SpawnResult{
+ spCh <- sr{
w: w,
err: nil,
+ }:
+ // everything ok, set ready state
+ w.State().Set(worker.StateReady)
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
- case res := <-c:
+ case res := <-spCh:
if res.err != nil {
return nil, res.err
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 51befb1e..f5e9669b 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
@@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 3ef65be8..d243a93f 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -102,6 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) {
func Test_Pipe_PipeError2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ // error cause
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -159,7 +160,7 @@ func Test_Pipe_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -184,11 +185,10 @@ func Test_Pipe_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
@@ -231,7 +231,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -255,7 +255,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -279,7 +279,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -305,7 +305,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -333,11 +333,10 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -379,7 +378,7 @@ func Test_Echo_Slow(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -409,10 +408,9 @@ func Test_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -441,10 +439,9 @@ func Test_Error(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -469,19 +466,19 @@ func Test_NumExecs(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 965a0f30..dc2b75cf 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -2,6 +2,7 @@ package socket
import (
"context"
+ "fmt"
"net"
"os/exec"
"sync"
@@ -29,8 +30,6 @@ type Factory struct {
// sockets which are waiting for process association
relays sync.Map
-
- ErrCh chan error
}
// NewSocketServer returns Factory attached to a given socket listener.
@@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
ls: ls,
tout: tout,
relays: sync.Map{},
- ErrCh: make(chan error, 10),
}
// Be careful
// https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
// https://github.com/golang/go/issues/5045
go func() {
- f.ErrCh <- f.listen()
+ err := f.listen()
+ // there is no logger here, use fmt
+ if err != nil {
+ fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
+ }
}()
return f
@@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
- err: err,
+ err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
err = w.Start()
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
rl, err := f.findRelayWithContext(ctxT, w)
@@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
w.Wait(),
)
- c <- socketSpawn{
+ select {
+ // try to write result
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ // if no receivers - return
+ default:
+ return
}
- return
}
w.AttachRelay(rl)
w.State().Set(worker.StateReady)
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: w,
err: nil,
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
@@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
+
+ // errors bundle
+ if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
+ }
+
w.State().Set(worker.StateReady)
return w, nil
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index b875e2c8..905a3b6b 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -16,7 +16,7 @@ import (
)
func Test_Tcp_Start2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -45,7 +45,7 @@ func Test_Tcp_Start2(t *testing.T) {
}
func Test_Tcp_StartCloseFactory2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
} else {
t.Skip("socket is busy")
@@ -72,7 +72,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
}
func Test_Tcp_StartError2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -96,7 +96,7 @@ func Test_Tcp_StartError2(t *testing.T) {
}
func Test_Tcp_Failboot2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err3 := ls.Close()
@@ -128,7 +128,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
}
func Test_Tcp_Invalid2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -148,7 +148,7 @@ func Test_Tcp_Invalid2(t *testing.T) {
}
func Test_Tcp_Broken2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -194,16 +194,15 @@ func Test_Tcp_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
func Test_Tcp_Echo2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 34fe088b..f9bb2178 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) {
ctx := context.Background()
time.Sleep(time.Millisecond * 10) // to ensure free socket
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -50,7 +50,7 @@ func Test_Tcp_Start(t *testing.T) {
func Test_Tcp_StartCloseFactory(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
} else {
t.Skip("socket is busy")
@@ -79,7 +79,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
func Test_Tcp_StartError(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -106,7 +106,7 @@ func Test_Tcp_Failboot(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err3 := ls.Close()
@@ -140,7 +140,7 @@ func Test_Tcp_Failboot(t *testing.T) {
func Test_Tcp_Timeout(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -163,7 +163,7 @@ func Test_Tcp_Timeout(t *testing.T) {
func Test_Tcp_Invalid(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -185,7 +185,7 @@ func Test_Tcp_Invalid(t *testing.T) {
func Test_Tcp_Broken(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -242,7 +241,7 @@ func Test_Tcp_Broken(t *testing.T) {
func Test_Tcp_Echo(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
wg.Wait()
}
@@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -487,7 +485,7 @@ func Test_Unix_Echo(t *testing.T) {
func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if err == nil {
defer func() {
err = ls.Close()
@@ -520,7 +518,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if err == nil {
defer func() {
err = ls.Close()
@@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index d2cfe2cd..ed8704bb 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -68,7 +68,7 @@ type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 02f11d0b..74e29b71 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -23,7 +23,7 @@ type SyncWorkerImpl struct {
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) SyncWorker {
+func From(process *Process) *SyncWorkerImpl {
return &SyncWorkerImpl{
process: process,
fPool: sync.Pool{New: func() interface{} {
@@ -36,14 +36,14 @@ func From(process *Process) SyncWorker {
}
// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.process.State().Value() != StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
@@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
// supervisor may set state of the worker during the work
@@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
}
type wexec struct {
- payload payload.Payload
+ payload *payload.Payload
err error
}
// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Str("payload can not be empty")),
+ err: errors.E(op, errors.Str("payload can not be empty")),
}
return
}
if tw.process.State().Value() != StateReady {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
@@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
tw.process.State().RegisterExec()
}
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, err),
+ err: errors.E(op, err),
}
return
}
@@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
if err != nil {
// append timeout error
err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return payload.Payload{}, multierr.Append(err, ctx.Err())
+ return nil, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
- return payload.Payload{}, res.err
+ return nil, res.err
}
return res.payload, nil
}
}
-func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
// get a frame
@@ -162,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
defer tw.putFrame(fr)
// can be 0 here
- fr.WriteVersion(frame.VERSION_1)
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
// obtain a buffer
buf := tw.get()
@@ -171,18 +168,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
buf.Write(p.Body)
// Context offset
- fr.WriteOptions(uint32(len(p.Context)))
- fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
+ fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WritePayload(buf.Bytes())
- fr.WriteCRC()
+ fr.WriteCRC(fr.Header())
// return buffer
tw.put(buf)
err := tw.Relay().Send(fr)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
frameR := tw.getFrame()
@@ -190,34 +187,34 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err = tw.process.Relay().Receive(frameR)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
- if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ if !frameR.VerifyCRC(frameR.Header()) {
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&frame.ERROR != byte(0) {
- return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
- options := frameR.ReadOptions()
+ options := frameR.ReadOptions(frameR.Header())
if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pld := payload.Payload{
+ pld := &payload.Payload{
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}
// by copying we free frame's payload slice
- // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
// https://blog.golang.org/slices-intro#TOC_6.
copy(pld.Body, frameR.Payload()[options[0]:])
copy(pld.Context, frameR.Payload()[:options[0]])
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index df556e93..64580f9f 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) {
sw := From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "Process is not ready (inactive)")
}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
index 44c466bb..3d60897b 100644
--- a/pkg/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
@@ -138,18 +138,18 @@ func (r *Request) Close(log logger.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (payload.Payload, error) {
+func (r *Request) Payload() (*payload.Payload, error) {
const op = errors.Op("marshal_payload")
- p := payload.Payload{}
+ p := &payload.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
index cbf22794..d22f09d4 100644
--- a/pkg/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
@@ -22,7 +22,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p payload.Payload) (*Response, error) {
+func NewResponse(p *payload.Payload) (*Response, error) {
const op = errors.Op("http_response")
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
new file mode 100644
index 00000000..51093978
--- /dev/null
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -0,0 +1,99 @@
+package channel
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ sync.RWMutex
+ // destroy signal
+ destroy uint64
+ // channel with the workers
+ workers chan worker.BaseProcess
+
+ len uint64
+}
+
+func NewVector(len uint64) *Vec {
+ vec := &Vec{
+ destroy: 0,
+ len: len,
+ workers: make(chan worker.BaseProcess, len),
+ }
+
+ return vec
+}
+
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case, where n is len of the channel
+func (v *Vec) Push(w worker.BaseProcess) {
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := uint64(0); i < v.len; i++ {
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ default:
+ // kill the current worker (just to be sure it's dead)
+ _ = wrk.Kill()
+ // replace with the new one
+ v.workers <- w
+ return
+ }
+ }
+ }
+}
+
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, errors.E(errors.WatcherStopped)
+ }
+
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
deleted file mode 100644
index e10ecdae..00000000
--- a/pkg/worker_watcher/container/interface.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package container
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Vector interface represents vector container
-type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
- // Destroy used to stop releasing the workers
- Destroy()
-}
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
new file mode 100644
index 00000000..edf81d60
--- /dev/null
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -0,0 +1,102 @@
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+const (
+ initialSize = 1
+ maxInitialSize = 8
+ maxInternalSliceSize = 10
+)
+
+type Node struct {
+ w []worker.BaseProcess
+ // LL
+ n *Node
+}
+
+type Queue struct {
+ mu sync.Mutex
+
+ head *Node
+ tail *Node
+
+ curr uint64
+ len uint64
+
+ sliceSize uint64
+}
+
+func NewQueue() *Queue {
+ q := &Queue{
+ mu: sync.Mutex{},
+ head: nil,
+ tail: nil,
+ curr: 0,
+ len: 0,
+ sliceSize: 0,
+ }
+
+ return q
+}
+
+func (q *Queue) Push(w worker.BaseProcess) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ h := newNode(initialSize)
+ q.head = h
+ q.tail = h
+ q.sliceSize = maxInitialSize
+ } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
+ n := newNode(maxInternalSliceSize)
+ q.tail.n = n
+ q.tail = n
+ q.sliceSize = maxInternalSliceSize
+ }
+
+ q.tail.w = append(q.tail.w, w)
+
+ atomic.AddUint64(&q.len, 1)
+
+ q.mu.Unlock()
+}
+
+func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ return nil, nil
+ }
+
+ w := q.head.w[q.curr]
+ q.head.w[q.curr] = nil
+ atomic.AddUint64(&q.len, ^uint64(0))
+ atomic.AddUint64(&q.curr, 1)
+
+ if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
+ n := q.head.n
+ q.head.n = nil
+ q.head = n
+ q.curr = 0
+ }
+
+ q.mu.Unlock()
+
+ return w, nil
+}
+
+func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
+
+}
+
+func (q *Queue) Destroy() {}
+
+func newNode(capacity int) *Node {
+ return &Node{w: make([]worker.BaseProcess, 0, capacity)}
+}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
deleted file mode 100644
index 24b5fa6d..00000000
--- a/pkg/worker_watcher/container/vec.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package container
-
-import (
- "context"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Vec struct {
- destroy uint64
- workers chan worker.BaseProcess
-}
-
-func NewVector(initialNumOfWorkers uint64) *Vec {
- vec := &Vec{
- destroy: 0,
- workers: make(chan worker.BaseProcess, initialNumOfWorkers),
- }
-
- return vec
-}
-
-func (v *Vec) Enqueue(w worker.BaseProcess) {
- v.workers <- w
-}
-
-func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, errors.E(errors.WatcherStopped)
- }
-
- select {
- case w := <-v.workers:
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
- }
-}
-
-func (v *Vec) Destroy() {
- atomic.StoreUint64(&v.destroy, 1)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b2d61d48..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -8,45 +8,54 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
)
// Vector interface represents vector container
type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64)
// Destroy used to stop releasing the workers
Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
+}
+
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control Destroy stage (that all workers are in the container)
+ numWorkers uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
- container: container.NewVector(numWorkers),
+ container: channel.NewVector(numWorkers),
numWorkers: numWorkers,
- workers: make([]worker.BaseProcess, 0, numWorkers),
- allocator: allocator,
- events: events,
+
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
}
return ww
}
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control the Destroy stage (that all workers are in the container)
- numWorkers uint64
- workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
-}
-
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.container.Enqueue(workers[i])
+ ww.container.Push(workers[i])
// add worker to watch slice
ww.workers = append(ww.workers, workers[i])
@@ -57,12 +66,12 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
- w, err := ww.container.Dequeue(ctx)
+ w, err := ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
}
@@ -78,11 +87,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
// =========================================================
// SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
// try to continuously get free one
for {
- w, err = ww.container.Dequeue(ctx)
+ w, err = ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
@@ -98,7 +107,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
case worker.StateReady:
return w, nil
case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
+ ww.container.Push(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -135,7 +144,7 @@ func (ww *workerWatcher) Allocate() error {
// unlock Allocate mutex
ww.Unlock()
// push the worker to the container
- ww.Push(sw)
+ ww.Release(sw)
return nil
}
@@ -158,11 +167,11 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
}
}
-// Push O(1) operation
-func (ww *workerWatcher) Push(w worker.BaseProcess) {
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
- ww.container.Enqueue(w)
+ ww.container.Push(w)
default:
_ = w.Kill()
}
@@ -226,13 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
- ww.Remove(w)
+ // set state as stopped
+ w.State().Set(worker.StateStopped)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{