diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/bst/bst.go | 2 | ||||
-rw-r--r-- | pkg/events/jobs_events.go | 8 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 56 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 22 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 2 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 8 | ||||
-rw-r--r-- | pkg/state/job/state.go | 19 | ||||
-rw-r--r-- | pkg/state/process/state.go (renamed from pkg/process/state.go) | 0 | ||||
-rw-r--r-- | pkg/worker_handler/errors.go | 1 | ||||
-rw-r--r-- | pkg/worker_handler/errors_windows.go | 1 | ||||
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 19 |
11 files changed, 96 insertions, 42 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go index f8426b12..dab9346c 100644 --- a/pkg/bst/bst.go +++ b/pkg/bst/bst.go @@ -88,7 +88,7 @@ func (b *BST) Remove(uuid string, topic string) { b.removeHelper(uuid, topic, nil) } -func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { curr := b for curr != nil { if topic < curr.topic { //nolint:gocritic diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go index 300f6748..f65ede67 100644 --- a/pkg/events/jobs_events.go +++ b/pkg/events/jobs_events.go @@ -6,7 +6,7 @@ import ( const ( // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK = iota + 12000 + EventPushOK J = iota + 12000 // EventPushError caused when job can not be registered. EventPushError @@ -58,6 +58,8 @@ func (ev J) String() string { return "EventPipeError" case EventDriverReady: return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" } return UnknownEventType } @@ -67,16 +69,12 @@ type JobEvent struct { Event J // String is job id. ID string - // Pipeline name Pipeline string - // Associated driver name (amqp, ephemeral, etc) Driver string - // Error for the jobs/pipes errors Error error - // event timings Start time.Time Elapsed time.Duration diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 051e7a8a..3eb0714f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them to complete the task). +// Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { sp.ww.Destroy(ctx) } @@ -250,34 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { switch { case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + w.State().Set(worker.StateInvalid) + return nil, err case errors.Is(errors.SoftJob, err): - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // TODO suspicious logic, redesign - err = sp.ww.Allocate() - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) - } + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop w.State().Set(worker.StateInvalid) - err = w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) } - } else { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Release(w) + + return nil, err } - } - w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - errS := w.Stop() - if errS != nil { - return nil, errors.E(op, err, errS) - } + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) - return nil, errors.E(op, err) + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, errors.E(op, err) + } } } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index b72b8c32..cb6578a8 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -227,9 +227,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { AddListeners(listener), ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) + defer p.Destroy(ctx) + time.Sleep(time.Second) res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) @@ -290,10 +290,12 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { }, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) + defer p.Destroy(ctx) + // prevent process is not ready + time.Sleep(time.Second) + var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) @@ -326,10 +328,12 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { }, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) + defer p.Destroy(ctx) + + // prevent process is not ready + time.Sleep(time.Second) assert.Len(t, p.Workers(), 0) var lastPID string @@ -366,10 +370,11 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { }, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) + defer p.Destroy(ctx) + time.Sleep(time.Second) + var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) @@ -460,6 +465,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) + time.Sleep(time.Second) for i := range p.Workers() { p.Workers()[i].State().Set(worker.StateErrored) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index bdaeade1..e6b2bd7c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -8,7 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/pkg/worker" ) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 0702a71f..d1b24574 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -39,6 +39,8 @@ func TestSupervisedPool_Exec(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) + time.Sleep(time.Second) + pidBefore := p.Workers()[0].Pid() for i := 0; i < 100; i++ { @@ -63,7 +65,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { ctx := context.Background() p, err := Initialize( ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") }, pipe.NewPipeFactory(), cfgSupervised, ) @@ -71,8 +73,10 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) + time.Sleep(time.Second) + for i := 0; i < 100; i++ { - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 500) _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go new file mode 100644 index 00000000..56050084 --- /dev/null +++ b/pkg/state/job/state.go @@ -0,0 +1,19 @@ +package job + +// State represents job's state +type State struct { + // Pipeline name + Pipeline string + // Driver name + Driver string + // Queue name (tube for the beanstalk) + Queue string + // Active jobs which are consumed from the driver but not handled by the PHP worker yet + Active int64 + // Delayed jobs + Delayed int64 + // Reserved jobs which are in the driver but not consumed yet + Reserved int64 + // Status - 1 Ready, 0 - Paused + Ready bool +} diff --git a/pkg/process/state.go b/pkg/state/process/state.go index bfc3a287..bfc3a287 100644 --- a/pkg/process/state.go +++ b/pkg/state/process/state.go diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go index 5fa8e64e..c3352a52 100644 --- a/pkg/worker_handler/errors.go +++ b/pkg/worker_handler/errors.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package handler diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go index 390cc7d1..3c6c2186 100644 --- a/pkg/worker_handler/errors_windows.go +++ b/pkg/worker_handler/errors_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package handler diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 51093978..7fb65a92 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) { // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: + // Stop Pop operations v.Lock() defer v.Unlock() @@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) { 2. Violated Get <-> Release operation (how ??) */ for i := uint64(0); i < v.len; i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states + // skip good states, put worker back case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO v.workers <- wrk continue + /* + Bad states are here. + */ default: // kill the current worker (just to be sure it's dead) - _ = wrk.Kill() - // replace with the new one + if wrk != nil { + _ = wrk.Kill() + } + // replace with the new one and return from the loop + // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker + // But this case will be handled in the worker_watcher::Get v.workers <- w return } @@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { } */ - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + if atomic.LoadUint64(&v.destroy) == 1 { return nil, errors.E(errors.WatcherStopped) } |