summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
committerValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
commitad39466afb39fac49977cfb97f20f682f54bf35e (patch)
tree59b630ad0ef9b52eb13e0df5c8e41d92ce277950 /pkg/pool
parentee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (diff)
Move roadrunner payload out of internal to pkgv2.0.0-alpha25
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/static_pool.go35
-rwxr-xr-xpkg/pool/static_pool_test.go39
-rwxr-xr-xpkg/pool/supervisor_pool.go15
-rw-r--r--pkg/pool/supervisor_test.go8
4 files changed, 50 insertions, 47 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 6cc42143..9cf79fd4 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -9,7 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
// Before is set of functions that executes BEFORE Exec
-type Before func(req internal.Payload) internal.Payload
+type Before func(req payload.Payload) payload.Payload
// After is set of functions that executes AFTER Exec
-type After func(req internal.Payload, resp internal.Payload) internal.Payload
+type After func(req payload.Payload, resp payload.Payload) payload.Payload
type Options func(p *StaticPool)
@@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
cfg: cfg,
cmd: cmd,
factory: factory,
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
after: make([]After, 0, 0),
before: make([]Before, 0, 0),
}
@@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
-func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec with context")
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (internal.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.ErrSoftJob, err) {
@@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.ww.PushWorker(w)
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
w.State().Set(internal.StateInvalid)
@@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
errS := w.Stop(bCtx)
if errS != nil {
- return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
}
@@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc
}
}
-func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
r, err := sw.(worker.SyncWorker).Exec(p)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index dd33a1a6..b96e9214 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,6 +14,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -80,7 +81,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -104,7 +105,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -128,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -189,7 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
- res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
@@ -212,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -287,11 +288,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -323,14 +324,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,14 +364,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -400,7 +401,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -422,7 +423,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(internal.Payload{Body: []byte("100")})
+ _, err := p.Exec(payload.Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -430,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -456,7 +457,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(internal.StateErrored)
}
- _, err = p.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -494,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -520,7 +521,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -549,7 +550,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 6d1f0c58..6faa609c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
const MB = 1024 * 1024
@@ -42,10 +43,10 @@ func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig
type ttlExec struct {
err error
- p internal.Payload
+ p payload.Payload
}
-func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec_supervised")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -59,7 +60,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
- p: internal.Payload{},
+ p: payload.Payload{},
}
}
@@ -72,10 +73,10 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
for {
select {
case <-ctx.Done():
- return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.p, nil
@@ -83,11 +84,11 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
}
}
-func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("supervised exec")
rsp, err := sp.pool.Exec(p)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 2e3e7fd2..7dd423b8 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -61,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 50)
- _, err = p.Exec(internal.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -98,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), internal.Payload{
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -139,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(internal.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})