summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
committerValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
commit9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (patch)
treee49c46b03d8facc73e96f1b6247d83367cc65398 /plugins
parent1033c25b6bfc752d6059e446510f651e22cbf49b (diff)
huge refactor
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/config.go6
-rw-r--r--plugins/http/handler.go18
-rw-r--r--plugins/http/plugin.go24
-rw-r--r--plugins/http/request.go10
-rw-r--r--plugins/http/response.go4
-rw-r--r--plugins/http/tests/handler_test.go165
-rw-r--r--plugins/http/tests/http_test.go3
-rw-r--r--plugins/http/tests/response_test.go16
-rw-r--r--plugins/http/tests/uploads_test.go27
-rw-r--r--plugins/informer/plugin.go6
-rw-r--r--plugins/informer/tests/test_plugin.go9
-rw-r--r--plugins/resetter/tests/test_plugin.go6
-rw-r--r--plugins/server/plugin.go28
-rw-r--r--plugins/server/tests/plugin_pipes.go15
-rw-r--r--plugins/server/tests/plugin_sockets.go10
-rw-r--r--plugins/server/tests/plugin_tcp.go10
16 files changed, 188 insertions, 169 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go
index d6efe310..00d2940b 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
)
type Cidrs []*net.IPNet
@@ -56,7 +56,7 @@ type Config struct {
Uploads *UploadsConfig
// Pool configures worker pool.
- Pool *roadrunner.PoolConfig
+ Pool *poolImpl.Config
// Env is environment variables passed to the http pool
Env map[string]string
@@ -141,7 +141,7 @@ func (c *Config) EnableFCGI() bool {
func (c *Config) InitDefaults() error {
if c.Pool == nil {
// default pool
- c.Pool = &roadrunner.PoolConfig{
+ c.Pool = &poolImpl.Config{
Debug: false,
NumWorkers: int64(runtime.NumCPU()),
MaxJobs: 1000,
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index 74b038ff..4cc08c41 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -10,9 +10,9 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
const (
@@ -23,8 +23,10 @@ const (
EventError
)
+const MB = 1024 * 1024
+
type Handle interface {
- AddListener(l util.EventListener)
+ AddListener(l worker.EventListener)
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
@@ -71,17 +73,17 @@ type handler struct {
uploads UploadsConfig
trusted Cidrs
log log.Logger
- pool roadrunner.Pool
+ pool pool.Pool
mul sync.Mutex
- lsn util.EventListener
+ lsn worker.EventListener
}
-func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handle, error) {
+func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
if pool == nil {
return nil, errors.E(errors.Str("pool should be initialized"))
}
return &handler{
- maxRequestSize: maxReqSize * roadrunner.MB,
+ maxRequestSize: maxReqSize * MB,
uploads: uploads,
pool: pool,
trusted: trusted,
@@ -89,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool ro
}
// Listen attaches handler event controller.
-func (h *handler) AddListener(l util.EventListener) {
+func (h *handler) AddListener(l worker.EventListener) {
h.mul.Lock()
defer h.mul.Unlock()
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 13299da1..9cb01d4b 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -15,10 +15,12 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
- factory "github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/interfaces/status"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/util"
@@ -47,17 +49,17 @@ type Plugin struct {
sync.Mutex
configurer config.Configurer
- server factory.Server
+ server server.Server
log log.Logger
cfg *Config
// middlewares to chain
mdwr middleware
// Event listener to stdout
- listener util.EventListener
+ listener worker.EventListener
// Pool which attached to all servers
- pool roadrunner.Pool
+ pool pool.Pool
// servers RR handler
handler Handle
@@ -69,7 +71,7 @@ type Plugin struct {
}
// AddListener attaches server event controller.
-func (s *Plugin) AddListener(listener util.EventListener) {
+func (s *Plugin) AddListener(listener worker.EventListener) {
// save listeners for Reset
s.listener = listener
s.pool.AddListener(listener)
@@ -77,7 +79,7 @@ func (s *Plugin) AddListener(listener util.EventListener) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
@@ -97,7 +99,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Serv
return errors.E(op, errors.Disabled)
}
- s.pool, err = server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
@@ -122,7 +124,7 @@ func (s *Plugin) logCallback(event interface{}) {
s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
case ErrorEvent:
s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error)
- case roadrunner.WorkerEvent:
+ case worker.Event:
s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State())
default:
fmt.Println(event)
@@ -284,7 +286,7 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Server returns associated pool workers
-func (s *Plugin) Workers() []roadrunner.WorkerBase {
+func (s *Plugin) Workers() []worker.BaseProcess {
return s.pool.Workers()
}
@@ -305,7 +307,7 @@ func (s *Plugin) Reset() error {
return errors.E(op, err)
}
- s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
diff --git a/plugins/http/request.go b/plugins/http/request.go
index 640bdec2..5df79b7d 100644
--- a/plugins/http/request.go
+++ b/plugins/http/request.go
@@ -9,8 +9,8 @@ import (
"strings"
j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
)
@@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (roadrunner.Payload, error) {
- p := roadrunner.Payload{}
+func (r *Request) Payload() (internal.Payload, error) {
+ p := internal.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return roadrunner.EmptyPayload, err
+ return internal.Payload{}, err
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return roadrunner.EmptyPayload, err
+ return internal.Payload{}, err
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/plugins/http/response.go b/plugins/http/response.go
index e3ac2756..9700a16c 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -6,7 +6,7 @@ import (
"strings"
"sync"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
)
// Response handles PSR7 response logic.
@@ -23,7 +23,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p roadrunner.Payload) (*Response, error) {
+func NewResponse(p internal.Payload) (*Response, error) {
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
return nil, err
diff --git a/plugins/http/tests/handler_test.go b/plugins/http/tests/handler_test.go
index 0c6a39ef..54a4ae80 100644
--- a/plugins/http/tests/handler_test.go
+++ b/plugins/http/tests/handler_test.go
@@ -10,7 +10,8 @@ import (
"runtime"
"strings"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
@@ -21,10 +22,10 @@ import (
)
func TestHandler_Echo(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -72,10 +73,10 @@ func Test_HandlerErrors(t *testing.T) {
}
func TestHandler_Headers(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "header", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -133,10 +134,10 @@ func TestHandler_Headers(t *testing.T) {
}
func TestHandler_Empty_User_Agent(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -193,10 +194,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}
func TestHandler_User_Agent(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -253,10 +254,10 @@ func TestHandler_User_Agent(t *testing.T) {
}
func TestHandler_Cookies(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "cookie", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -318,10 +319,10 @@ func TestHandler_Cookies(t *testing.T) {
}
func TestHandler_JsonPayload_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -382,10 +383,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
}
func TestHandler_JsonPayload_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -442,10 +443,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
}
func TestHandler_JsonPayload_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -502,10 +503,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
}
func TestHandler_FormData_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -575,10 +576,10 @@ func TestHandler_FormData_POST(t *testing.T) {
}
func TestHandler_FormData_POST_Overwrite(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -648,10 +649,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
}
func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -720,10 +721,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
}
func TestHandler_FormData_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -792,10 +793,10 @@ func TestHandler_FormData_PUT(t *testing.T) {
}
func TestHandler_FormData_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -864,10 +865,10 @@ func TestHandler_FormData_PATCH(t *testing.T) {
}
func TestHandler_Multipart_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -978,10 +979,10 @@ func TestHandler_Multipart_POST(t *testing.T) {
}
func TestHandler_Multipart_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1092,10 +1093,10 @@ func TestHandler_Multipart_PUT(t *testing.T) {
}
func TestHandler_Multipart_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1208,10 +1209,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
}
func TestHandler_Error(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1254,10 +1255,10 @@ func TestHandler_Error(t *testing.T) {
}
func TestHandler_Error2(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error2", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1300,10 +1301,10 @@ func TestHandler_Error2(t *testing.T) {
}
func TestHandler_Error3(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "pid", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1359,10 +1360,10 @@ func TestHandler_Error3(t *testing.T) {
}
func TestHandler_ResponseDuration(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1420,10 +1421,10 @@ func TestHandler_ResponseDuration(t *testing.T) {
}
func TestHandler_ResponseDurationDelayed(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echoDelay", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1480,10 +1481,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
}
func TestHandler_ErrorDuration(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1554,10 +1555,10 @@ func TestHandler_IP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1615,10 +1616,10 @@ func TestHandler_XRealIP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1681,10 +1682,10 @@ func TestHandler_XForwardedFor(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1746,10 +1747,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1794,10 +1795,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
}
func BenchmarkHandler_Listen_Echo(b *testing.B) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index c8dd4b38..2979949d 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -19,6 +19,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -900,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) {
mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1")
mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any())
- mockLogger.EXPECT().Debug("worker event received", "event", roadrunner.EventWorkerLog, "worker state", gomock.Any())
+ mockLogger.EXPECT().Debug("worker event received", "event", worker.EventWorkerLog, "worker state", gomock.Any())
err = cont.RegisterAll(
cfg,
diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go
index 2bfe7d56..a526fe03 100644
--- a/plugins/http/tests/response_test.go
+++ b/plugins/http/tests/response_test.go
@@ -6,7 +6,7 @@ import (
"net/http"
"testing"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
http2 "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
)
@@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)})
+ r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go
index d36d4793..f255ec91 100644
--- a/plugins/http/tests/uploads_test.go
+++ b/plugins/http/tests/uploads_test.go
@@ -16,7 +16,8 @@ import (
"time"
j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
)
@@ -26,10 +27,10 @@ var json = j.ConfigCompatibleWithStandardLibrary
const testFile = "uploads_test.go"
func TestHandler_Upload_File(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -109,10 +110,10 @@ func TestHandler_Upload_File(t *testing.T) {
}
func TestHandler_Upload_NestedFile(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -192,10 +193,10 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
}
func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -275,10 +276,10 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
}
func TestHandler_Upload_File_Forbids(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index f3013394..449be085 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,9 +3,9 @@ package informer
import (
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/informer"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
const PluginName = "informer"
@@ -21,8 +21,8 @@ func (p *Plugin) Init(log log.Logger) error {
return nil
}
-// Workers provides WorkerBase slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) {
+// Workers provides BaseProcess slice with workers for the requested plugin
+func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
const op = errors.Op("get workers")
svc, ok := p.registry[name]
if !ok {
diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go
index 473b6de7..3fdefde3 100644
--- a/plugins/informer/tests/test_plugin.go
+++ b/plugins/informer/tests/test_plugin.go
@@ -4,17 +4,18 @@ import (
"context"
"time"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -48,7 +49,7 @@ func (p1 *Plugin1) Name() string {
return "informer.plugin1"
}
-func (p1 *Plugin1) Workers() []roadrunner.WorkerBase {
+func (p1 *Plugin1) Workers() []worker.BaseProcess {
pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go
index 9f48a43f..1d770e70 100644
--- a/plugins/resetter/tests/test_plugin.go
+++ b/plugins/resetter/tests/test_plugin.go
@@ -4,17 +4,17 @@ import (
"context"
"time"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ea6d42eb..7c91bbcc 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,9 +8,13 @@ import (
"strings"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/socket"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -21,7 +25,7 @@ const PluginName = "server"
type Plugin struct {
cfg Config
log log.Logger
- factory roadrunner.Factory
+ factory worker.Factory
}
// Init application provider.
@@ -93,7 +97,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) {
const op = errors.Op("new worker")
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -111,13 +115,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) {
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, err
}
- p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt)
+ p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt)
if err != nil {
return nil, err
}
@@ -128,10 +132,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf
}
// creates relay and worker factory.
-func (server *Plugin) initFactory() (roadrunner.Factory, error) {
+func (server *Plugin) initFactory() (worker.Factory, error) {
const op = errors.Op("network factory init")
if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
- return roadrunner.NewPipeFactory(), nil
+ return pipe.NewPipeFactory(), nil
}
dsn := strings.Split(server.cfg.Relay, "://")
@@ -147,9 +151,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
case "tcp":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
@@ -165,11 +169,11 @@ func (server *Plugin) setEnv(e server.Env) []string {
}
func (server *Plugin) collectLogs(event interface{}) {
- if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we, ok := event.(worker.Event); ok {
switch we.Event {
- case roadrunner.EventWorkerError:
+ case worker.EventWorkerError:
server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
- case roadrunner.EventWorkerLog:
+ case worker.EventWorkerLog:
server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index fbd37e12..61c9a8f9 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -5,8 +5,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -14,12 +17,12 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{
type Foo struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
@@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index 4942d4c5..3b97efff 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo2 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 89757a02..2857dadc 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo3 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh