summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/static_pool_test.go32
-rw-r--r--pkg/pool/supervisor_test.go10
-rw-r--r--pkg/priority_queue/queue.go1
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go8
-rw-r--r--plugins/jobs/config.go7
-rw-r--r--plugins/jobs/plugin.go10
-rw-r--r--plugins/server/interface.go2
-rw-r--r--plugins/server/plugin.go19
-rw-r--r--plugins/websockets/plugin.go4
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml2
-rw-r--r--tests/plugins/http/configs/.rr-env.yaml6
-rw-r--r--tests/plugins/http/handler_test.go54
-rw-r--r--tests/plugins/http/uploads_test.go8
-rw-r--r--tests/plugins/informer/test_plugin.go2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml (renamed from plugins/jobs/.rr.yaml)28
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go10
-rw-r--r--tests/plugins/resetter/test_plugin.go2
-rw-r--r--tests/plugins/server/plugin_pipes.go2
20 files changed, 111 insertions, 104 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index ab025fa1..74e06b81 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -26,7 +26,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg Config
+ cfg *Config
// worker command creator
cmd Command
@@ -51,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6f875072..f264c6dc 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg2 = Config{
+ var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -264,7 +264,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -283,7 +283,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -320,7 +320,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -360,7 +360,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -400,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -422,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -452,7 +452,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -506,7 +506,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: false,
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -539,7 +539,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -556,7 +556,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -595,7 +595,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -626,7 +626,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index dc307c33..348622c7 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfgSupervised = Config{
+var cfgSupervised = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -82,7 +82,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -123,7 +123,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
}
func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -171,7 +171,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -213,7 +213,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
}
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go
new file mode 100644
index 00000000..f09b99ff
--- /dev/null
+++ b/pkg/priority_queue/queue.go
@@ -0,0 +1 @@
+package priorityqueue
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index bec01ac3..fb174792 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -323,7 +323,7 @@ func (p *Plugin) Reset() error {
p.pool = nil
var err error
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 4d444c7b..df0d31be 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -2,6 +2,10 @@ package ephemeral
import "github.com/spiral/roadrunner/v2/plugins/logger"
+const (
+ PluginName string = "ephemeral"
+)
+
type Plugin struct {
log logger.Logger
}
@@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error {
p.log = log
return nil
}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 1e49b959..74e4a811 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -12,7 +12,7 @@ import (
type Config struct {
// Workers configures roadrunner server and worker busy.
// Workers *roadrunner.ServerConfig
- poolCfg poolImpl.Config
+ poolCfg *poolImpl.Config
// Dispatch defines where and how to match jobs.
Dispatch map[string]*structs.Options
@@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error {
if err != nil {
return errors.E(op, err)
}
+
+ if c.poolCfg != nil {
+ c.poolCfg.InitDefaults()
+ }
+
return nil
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 072f872a..e7466efb 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
+ err = p.cfg.InitDefaults()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
if err != nil {
return errors.E(op, err)
@@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ // initialize sub-plugins
+ // provide a queue to them
+ // start consume loop
+ // start resp loop
+
return errCh
}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 0424d52d..b0f84a7f 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -19,5 +19,5 @@ type Server interface {
// NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
// NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
- NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
+ NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e2fa0086..42273ed7 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -21,14 +21,14 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-// PluginName for the server
-const PluginName = "server"
-
-// RrRelay env variable key (internal)
-const RrRelay = "RR_RELAY"
-
-// RrRPC env variable key (internal) if the RPC presents
-const RrRPC = "RR_RPC"
+const (
+ // PluginName for the server
+ PluginName = "server"
+ // RrRelay env variable key (internal)
+ RrRelay = "RR_RELAY"
+ // RrRPC env variable key (internal) if the RPC presents
+ RrRPC = "RR_RPC"
+)
// Plugin manages worker
type Plugin struct {
@@ -140,8 +140,9 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
const op = errors.Op("server_plugin_new_worker_pool")
+
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index c9a31613..5925a588 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -273,7 +273,7 @@ func (p *Plugin) Reset() error {
p.phpPool = nil
var err error
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
index d8daa251..66114d64 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml
index 99358b04..4ea8ec73 100644
--- a/tests/plugins/http/configs/.rr-env.yaml
+++ b/tests/plugins/http/configs/.rr-env.yaml
@@ -3,17 +3,13 @@ rpc:
server:
command: "php ../../http/client.php env pipes"
- user: ""
- group: ""
- env:
- "env_key": "ENV_VALUE"
relay: "pipes"
relay_timeout: "20s"
http:
address: 127.0.0.1:12084
max_request_size: 1024
- middleware: [ "" ]
+ middleware: []
env:
"RR_HTTP": "true"
"env_key": "ENV_VALUE"
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 40e3a720..37d9452c 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index df696668..d02f9eee 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 43335999..62816d02 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -10,7 +10,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/plugins/jobs/.rr.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 1b84515f..b21f764c 100644
--- a/plugins/jobs/.rr.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -1,5 +1,10 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
server:
- command: "php worker.php"
+ command: "php ../../psr-worker-bench.php"
+ relay: "pipes"
+ relay_timeout: "20s"
jobs:
# worker pool configuration
@@ -50,24 +55,3 @@ jobs:
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
consume: ["local", "amqp", "beanstalk", "sqs"]
-
-# monitors rr server(s)
-limit:
- # check worker state each second
- interval: 1
-
- # custom watch configuration for each service
- services:
- # monitor queue workers
- jobs:
- # maximum allowed memory consumption per worker (soft)
- maxMemory: 100
-
- # maximum time to live for the worker (soft)
- TTL: 0
-
- # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
- idleTTL: 0
-
- # max_execution_time (brutal)
- execTTL: 60
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index c4a7f72e..e8b4e83d 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -10,9 +10,11 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/memory"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
"github.com/stretchr/testify/assert"
)
@@ -21,15 +23,17 @@ func TestJobsInit(t *testing.T) {
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-kv-init.yaml",
+ Path: "configs/.rr-jobs-init.yaml",
Prefix: "rr",
}
err = cont.RegisterAll(
cfg,
- &memory.Plugin{},
+ &server.Plugin{},
&rpcPlugin.Plugin{},
&logger.ZapLogger{},
+ &jobs.Plugin{},
+ &ephemeral.Plugin{},
)
assert.NoError(t, err)
diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go
index 61942516..5c26cbd0 100644
--- a/tests/plugins/resetter/test_plugin.go
+++ b/tests/plugins/resetter/test_plugin.go
@@ -9,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = poolImpl.Config{
+var testPoolConfig = &poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index f1c13734..e813e456 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -15,7 +15,7 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,