summaryrefslogtreecommitdiff
path: root/plugins/server
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/server')
-rw-r--r--plugins/server/plugin.go28
-rw-r--r--plugins/server/tests/plugin_pipes.go15
-rw-r--r--plugins/server/tests/plugin_sockets.go10
-rw-r--r--plugins/server/tests/plugin_tcp.go10
4 files changed, 37 insertions, 26 deletions
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ea6d42eb..7c91bbcc 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,9 +8,13 @@ import (
"strings"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/socket"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -21,7 +25,7 @@ const PluginName = "server"
type Plugin struct {
cfg Config
log log.Logger
- factory roadrunner.Factory
+ factory worker.Factory
}
// Init application provider.
@@ -93,7 +97,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) {
const op = errors.Op("new worker")
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -111,13 +115,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) {
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, err
}
- p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt)
+ p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt)
if err != nil {
return nil, err
}
@@ -128,10 +132,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf
}
// creates relay and worker factory.
-func (server *Plugin) initFactory() (roadrunner.Factory, error) {
+func (server *Plugin) initFactory() (worker.Factory, error) {
const op = errors.Op("network factory init")
if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
- return roadrunner.NewPipeFactory(), nil
+ return pipe.NewPipeFactory(), nil
}
dsn := strings.Split(server.cfg.Relay, "://")
@@ -147,9 +151,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
case "tcp":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
@@ -165,11 +169,11 @@ func (server *Plugin) setEnv(e server.Env) []string {
}
func (server *Plugin) collectLogs(event interface{}) {
- if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we, ok := event.(worker.Event); ok {
switch we.Event {
- case roadrunner.EventWorkerError:
+ case worker.EventWorkerError:
server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
- case roadrunner.EventWorkerLog:
+ case worker.EventWorkerLog:
server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index fbd37e12..61c9a8f9 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -5,8 +5,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -14,12 +17,12 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{
type Foo struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
@@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index 4942d4c5..3b97efff 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo2 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 89757a02..2857dadc 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo3 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh