summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 03:16:55 +0300
committerValery Piashchynski <[email protected]>2020-12-17 03:16:55 +0300
commit40cfd9f6b44dfe987bfbf010bf67b32abdc64208 (patch)
tree10e3c3cd0805619ac30533078eb7d2585877a1b3 /pkg
parent9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (diff)
Now better
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pipe/pipe_factory_test.go4
-rwxr-xr-xpkg/pool/static_pool.go19
-rwxr-xr-xpkg/pool/static_pool_test.go11
-rwxr-xr-xpkg/pool/supervisor_pool.go19
-rwxr-xr-xpkg/worker/sync_worker.go3
-rwxr-xr-xpkg/worker/worker.go9
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go18
7 files changed, 43 insertions, 40 deletions
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 8250d226..99212ff8 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -413,7 +413,7 @@ func Test_Broken(t *testing.T) {
data := ""
mu := &sync.Mutex{}
w.AddListener(func(event interface{}) {
- if wev, ok := event.(worker.Event); ok {
+ if wev, ok := event.(events.WorkerEvent); ok {
mu.Lock()
data = string(wev.Payload.([]byte))
mu.Unlock()
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 220ea8e9..691290b2 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -5,6 +5,7 @@ import (
"os/exec"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -40,7 +41,7 @@ type StaticPool struct {
factory worker.Factory
// distributes the events
- events worker.EventsHandler
+ events events.Handler
// manages worker states and TTLs
ww worker.Watcher
@@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options {
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) AddListener(listener worker.EventListener) {
+func (sp *StaticPool) AddListener(listener events.EventListener) {
sp.events.AddListener(listener)
}
@@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(p)
@@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
w.State().Set(internal.StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
@@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
if stopErr := sw.Stop(context.Background()); stopErr != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
return r, err
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 8b13c7c9..0794b8e6 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,8 +12,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
@@ -179,8 +178,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
block := make(chan struct{})
p.AddListener(func(event interface{}) {
- if wev, ok := event.(worker.Event); ok {
- if wev.Event == worker.EventWorkerLog {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerLog {
e := string(wev.Payload.([]byte))
if strings.ContainsAny(e, "undefined_function()") {
block <- struct{}{}
@@ -227,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
p.AddListener(func(event interface{}) {
- if pe, ok := event.(pool.Event); ok {
- if pe.Event == pool.EventWorkerConstruct {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
wg.Done()
}
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 0a2d16f7..6d1f0c58 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -22,13 +23,13 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
- events worker.EventsHandler
+ events events.Handler
pool pool.Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised {
+func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
events: events,
@@ -91,7 +92,7 @@ func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *supervised) AddListener(listener worker.EventListener) {
+func (sp *supervised) AddListener(listener events.EventListener) {
sp.pool.AddListener(listener)
}
@@ -156,20 +157,20 @@ func (sp *supervised) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
}
@@ -197,10 +198,10 @@ func (sp *supervised) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 0fcde2c3..1eb1396e 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"go.uber.org/multierr"
@@ -193,7 +194,7 @@ func (tw *syncWorker) Created() time.Time {
return tw.w.Created()
}
-func (tw *syncWorker) AddListener(listener worker.EventListener) {
+func (tw *syncWorker) AddListener(listener events.EventListener) {
tw.w.AddListener(listener)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index d2b4374b..998ed592 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,6 +13,7 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/util"
@@ -43,7 +44,7 @@ type Process struct {
created time.Time
// updates parent supervisor or pool about Process events
- events worker.EventsHandler
+ events events.Handler
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -119,7 +120,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) AddListener(listener worker.EventListener) {
+func (w *Process) AddListener(listener events.EventListener) {
w.events.AddListener(listener)
}
@@ -279,7 +280,7 @@ func (w *Process) watch() {
buf := w.get()
// read the last data
n, _ := w.rd.Read(*buf)
- w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
w.stderr.Write((*buf)[:n])
@@ -290,7 +291,7 @@ func (w *Process) watch() {
// read the max 10kb of stderr per one read
buf := w.get()
n, _ := w.rd.Read(*buf)
- w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
w.stderr.Write((*buf)[:n])
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 530ce5d6..8a71ff8a 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
@@ -140,7 +140,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events worker.EventsHandler) worker.Watcher {
+func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -158,7 +158,7 @@ type workerWatcher struct {
allocator worker.Allocator
initialNumWorkers int64
actualNumWorkers int64
- events worker.EventsHandler
+ events events.Handler
}
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
@@ -229,8 +229,8 @@ func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
- ww.events.Push(pool.Event{
- Event: pool.EventWorkerConstruct,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
Payload: sw,
})
@@ -279,8 +279,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("process wait")
err := w.Wait()
if err != nil {
- ww.events.Push(worker.Event{
- Event: worker.EventWorkerError,
+ ww.events.Push(events.WorkerEvent{
+ Event: events.EventWorkerError,
Worker: w,
Payload: errors.E(op, err),
})
@@ -294,8 +294,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
_ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
- ww.events.Push(pool.Event{
- Event: pool.EventPoolError,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventPoolError,
Payload: errors.E(op, err),
})
}