summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/bst/bst.go2
-rw-r--r--pkg/events/jobs_events.go8
-rwxr-xr-xpkg/pool/static_pool.go56
-rwxr-xr-xpkg/pool/static_pool_test.go22
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pool/supervisor_test.go8
-rw-r--r--pkg/state/job/state.go19
-rw-r--r--pkg/state/process/state.go (renamed from pkg/process/state.go)0
-rw-r--r--pkg/worker_handler/errors.go1
-rw-r--r--pkg/worker_handler/errors_windows.go1
-rw-r--r--pkg/worker_watcher/container/channel/vec.go19
11 files changed, 96 insertions, 42 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
index f8426b12..dab9346c 100644
--- a/pkg/bst/bst.go
+++ b/pkg/bst/bst.go
@@ -88,7 +88,7 @@ func (b *BST) Remove(uuid string, topic string) {
b.removeHelper(uuid, topic, nil)
}
-func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) {
curr := b
for curr != nil {
if topic < curr.topic { //nolint:gocritic
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
index 300f6748..f65ede67 100644
--- a/pkg/events/jobs_events.go
+++ b/pkg/events/jobs_events.go
@@ -6,7 +6,7 @@ import (
const (
// EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK = iota + 12000
+ EventPushOK J = iota + 12000
// EventPushError caused when job can not be registered.
EventPushError
@@ -58,6 +58,8 @@ func (ev J) String() string {
return "EventPipeError"
case EventDriverReady:
return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
}
return UnknownEventType
}
@@ -67,16 +69,12 @@ type JobEvent struct {
Event J
// String is job id.
ID string
-
// Pipeline name
Pipeline string
-
// Associated driver name (amqp, ephemeral, etc)
Driver string
-
// Error for the jobs/pipes errors
Error error
-
// event timings
Start time.Time
Elapsed time.Duration
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 051e7a8a..3eb0714f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them to complete the task).
+// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
}
@@ -250,34 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
switch {
case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
case errors.Is(errors.SoftJob, err):
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // TODO suspicious logic, redesign
- err = sp.ww.Allocate()
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
- }
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
w.State().Set(worker.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- } else {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Release(w)
+
+ return nil, err
}
- }
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(op, err, errS)
- }
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
- return nil, errors.E(op, err)
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
}
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b72b8c32..cb6578a8 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -227,9 +227,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
AddListeners(listener),
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
@@ -290,10 +290,12 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ // prevent process is not ready
+ time.Sleep(time.Second)
+
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
@@ -326,10 +328,12 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+
+ // prevent process is not ready
+ time.Sleep(time.Second)
assert.Len(t, p.Workers(), 0)
var lastPID string
@@ -366,10 +370,11 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
+
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
@@ -460,6 +465,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
for i := range p.Workers() {
p.Workers()[i].State().Set(worker.StateErrored)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index bdaeade1..e6b2bd7c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 0702a71f..d1b24574 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -39,6 +39,8 @@ func TestSupervisedPool_Exec(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
+
pidBefore := p.Workers()[0].Pid()
for i := 0; i < 100; i++ {
@@ -63,7 +65,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
ctx := context.Background()
p, err := Initialize(
ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") },
pipe.NewPipeFactory(),
cfgSupervised,
)
@@ -71,8 +73,10 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
+
for i := 0; i < 100; i++ {
- time.Sleep(time.Millisecond * 100)
+ time.Sleep(time.Millisecond * 500)
_, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
new file mode 100644
index 00000000..56050084
--- /dev/null
+++ b/pkg/state/job/state.go
@@ -0,0 +1,19 @@
+package job
+
+// State represents job's state
+type State struct {
+ // Pipeline name
+ Pipeline string
+ // Driver name
+ Driver string
+ // Queue name (tube for the beanstalk)
+ Queue string
+ // Active jobs which are consumed from the driver but not handled by the PHP worker yet
+ Active int64
+ // Delayed jobs
+ Delayed int64
+ // Reserved jobs which are in the driver but not consumed yet
+ Reserved int64
+ // Status - 1 Ready, 0 - Paused
+ Ready bool
+}
diff --git a/pkg/process/state.go b/pkg/state/process/state.go
index bfc3a287..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/state/process/state.go
diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..c3352a52 100644
--- a/pkg/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
@@ -1,3 +1,4 @@
+//go:build !windows
// +build !windows
package handler
diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..3c6c2186 100644
--- a/pkg/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
@@ -1,3 +1,4 @@
+//go:build windows
// +build windows
package handler
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 51093978..7fb65a92 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
+ // Stop Pop operations
v.Lock()
defer v.Unlock()
@@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) {
2. Violated Get <-> Release operation (how ??)
*/
for i := uint64(0); i < v.len; i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states
+ // skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
+ /*
+ Bad states are here.
+ */
default:
// kill the current worker (just to be sure it's dead)
- _ = wrk.Kill()
- // replace with the new one
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
@@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
}
*/
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}