summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rwxr-xr-xpool/static_pool.go56
-rwxr-xr-xpool/static_pool_test.go3
-rwxr-xr-xpool/supervisor_pool.go43
-rw-r--r--pool/supervisor_test.go1
4 files changed, 28 insertions, 75 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 27db830c..11112e72 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -99,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -195,11 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid())))
}
}
@@ -221,11 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(&events.RREvent{
- T: events.EventNoFreeWorkers,
- P: pluginName,
- M: fmt.Sprintf("error: %s", err),
- })
+ sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err)))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -245,20 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(&events.RREvent{
- T: events.EventExecTTL,
- P: pluginName,
- M: fmt.Sprintf("error: %s", err),
- })
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err)))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -279,11 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -291,11 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerDestruct,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -319,11 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerConstruct,
- P: pluginName,
- M: fmt.Sprintf("pid: %d", sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid())))
return sw, nil
}
}
@@ -345,11 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
return nil, err
}
@@ -366,11 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
}
return r, err
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index abef3779..717d301e 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -170,6 +170,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -201,6 +202,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
// Run pool events
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
require.NoError(t, err)
@@ -489,6 +491,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
require.NoError(t, err)
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index c1fb6eec..1a94f6a0 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -28,23 +28,20 @@ type Supervised interface {
}
type supervised struct {
- cfg *SupervisorConfig
- events events.EventBus
- eventsID string
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
+ cfg *SupervisorConfig
+ events events.EventBus
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised {
- eb, id := events.Bus()
+func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised {
sp := &supervised{
- cfg: cfg,
- events: eb,
- eventsID: id,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
+ cfg: cfg,
+ events: eb,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
}
return sp
@@ -155,11 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventTTL,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -179,11 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventMaxMemory,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -238,11 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventIdleTTL,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 9c0bfdaa..eb3c37dd 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -332,6 +332,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
}
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
require.NoError(t, err)