diff options
Diffstat (limited to 'plugins/server')
-rw-r--r-- | plugins/server/plugin.go | 28 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 15 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go | 10 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go | 10 |
4 files changed, 37 insertions, 26 deletions
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index ea6d42eb..7c91bbcc 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,9 +8,13 @@ import ( "strings" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/socket" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) @@ -21,7 +25,7 @@ const PluginName = "server" type Plugin struct { cfg Config log log.Logger - factory roadrunner.Factory + factory worker.Factory } // Init application provider. @@ -93,7 +97,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) { const op = errors.Op("new worker") spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -111,13 +115,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) { spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) + p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } @@ -128,10 +132,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf } // creates relay and worker factory. -func (server *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (worker.Factory, error) { const op = errors.Op("network factory init") if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { - return roadrunner.NewPipeFactory(), nil + return pipe.NewPipeFactory(), nil } dsn := strings.Split(server.cfg.Relay, "://") @@ -147,9 +151,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } @@ -165,11 +169,11 @@ func (server *Plugin) setEnv(e server.Env) []string { } func (server *Plugin) collectLogs(event interface{}) { - if we, ok := event.(roadrunner.WorkerEvent); ok { + if we, ok := event.(worker.Event); ok { switch we.Event { - case roadrunner.EventWorkerError: + case worker.EventWorkerError: server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) - case roadrunner.EventWorkerLog: + case worker.EventWorkerLog: server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index fbd37e12..61c9a8f9 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -5,8 +5,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -14,12 +17,12 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{ type Foo struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error { @@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 4942d4c5..3b97efff 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo2 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 89757a02..2857dadc 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo3 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh |