diff options
-rwxr-xr-x | pkg/pool/static_pool.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 32 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 10 | ||||
-rw-r--r-- | pkg/priority_queue/queue.go | 1 | ||||
-rw-r--r-- | plugins/http/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 8 | ||||
-rw-r--r-- | plugins/jobs/config.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 10 | ||||
-rw-r--r-- | plugins/server/interface.go | 2 | ||||
-rw-r--r-- | plugins/server/plugin.go | 19 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 4 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/http/configs/.rr-env.yaml | 6 | ||||
-rw-r--r-- | tests/plugins/http/handler_test.go | 54 | ||||
-rw-r--r-- | tests/plugins/http/uploads_test.go | 8 | ||||
-rw-r--r-- | tests/plugins/informer/test_plugin.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml (renamed from plugins/jobs/.rr.yaml) | 28 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 10 | ||||
-rw-r--r-- | tests/plugins/resetter/test_plugin.go | 2 | ||||
-rw-r--r-- | tests/plugins/server/plugin_pipes.go | 2 |
20 files changed, 111 insertions, 104 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index ab025fa1..74e06b81 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -26,7 +26,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg *Config // worker command creator cmd Command @@ -51,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6f875072..f264c6dc 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } - var cfg2 = Config{ + var cfg2 = &Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -264,7 +264,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -283,7 +283,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -320,7 +320,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -360,7 +360,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -400,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -422,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -452,7 +452,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -506,7 +506,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: false, NumWorkers: 1, AllocateTimeout: time.Second, @@ -539,7 +539,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -556,7 +556,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -595,7 +595,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -626,7 +626,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index dc307c33..348622c7 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfgSupervised = Config{ +var cfgSupervised = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -82,7 +82,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -123,7 +123,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -171,7 +171,7 @@ func TestSupervisedPool_Idle(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -213,7 +213,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { } func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go new file mode 100644 index 00000000..f09b99ff --- /dev/null +++ b/pkg/priority_queue/queue.go @@ -0,0 +1 @@ +package priorityqueue diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index bec01ac3..fb174792 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -323,7 +323,7 @@ func (p *Plugin) Reset() error { p.pool = nil var err error - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 4d444c7b..df0d31be 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,6 +2,10 @@ package ephemeral import "github.com/spiral/roadrunner/v2/plugins/logger" +const ( + PluginName string = "ephemeral" +) + type Plugin struct { log logger.Logger } @@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error { p.log = log return nil } + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1e49b959..74e4a811 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -12,7 +12,7 @@ import ( type Config struct { // Workers configures roadrunner server and worker busy. // Workers *roadrunner.ServerConfig - poolCfg poolImpl.Config + poolCfg *poolImpl.Config // Dispatch defines where and how to match jobs. Dispatch map[string]*structs.Options @@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error { if err != nil { return errors.E(op, err) } + + if c.poolCfg != nil { + c.poolCfg.InitDefaults() + } + return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 072f872a..e7466efb 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + err = p.cfg.InitDefaults() + if err != nil { + return errors.E(op, err) + } + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) if err != nil { return errors.E(op, err) @@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + // initialize sub-plugins + // provide a queue to them + // start consume loop + // start resp loop + return errCh } diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 0424d52d..b0f84a7f 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -19,5 +19,5 @@ type Server interface { // NewWorker return a new worker with provided and attached by the user listeners and environment variables NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration - NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) + NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e2fa0086..42273ed7 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -21,14 +21,14 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -// PluginName for the server -const PluginName = "server" - -// RrRelay env variable key (internal) -const RrRelay = "RR_RELAY" - -// RrRPC env variable key (internal) if the RPC presents -const RrRPC = "RR_RPC" +const ( + // PluginName for the server + PluginName = "server" + // RrRelay env variable key (internal) + RrRelay = "RR_RELAY" + // RrRPC env variable key (internal) if the RPC presents + RrRPC = "RR_RPC" +) // Plugin manages worker type Plugin struct { @@ -140,8 +140,9 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { const op = errors.Op("server_plugin_new_worker_pool") + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index c9a31613..5925a588 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -273,7 +273,7 @@ func (p *Plugin) Reset() error { p.phpPool = nil var err error - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml index d8daa251..66114d64 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml index 99358b04..4ea8ec73 100644 --- a/tests/plugins/http/configs/.rr-env.yaml +++ b/tests/plugins/http/configs/.rr-env.yaml @@ -3,17 +3,13 @@ rpc: server: command: "php ../../http/client.php env pipes" - user: "" - group: "" - env: - "env_key": "ENV_VALUE" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:12084 max_request_size: 1024 - middleware: [ "" ] + middleware: [] env: "RR_HTTP": "true" "env_key": "ENV_VALUE" diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 40e3a720..37d9452c 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index df696668..d02f9eee 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 43335999..62816d02 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -10,7 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/plugins/jobs/.rr.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index 1b84515f..b21f764c 100644 --- a/plugins/jobs/.rr.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -1,5 +1,10 @@ +rpc: + listen: tcp://127.0.0.1:6001 + server: - command: "php worker.php" + command: "php ../../psr-worker-bench.php" + relay: "pipes" + relay_timeout: "20s" jobs: # worker pool configuration @@ -50,24 +55,3 @@ jobs: # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually consume: ["local", "amqp", "beanstalk", "sqs"] - -# monitors rr server(s) -limit: - # check worker state each second - interval: 1 - - # custom watch configuration for each service - services: - # monitor queue workers - jobs: - # maximum allowed memory consumption per worker (soft) - maxMemory: 100 - - # maximum time to live for the worker (soft) - TTL: 0 - - # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft) - idleTTL: 0 - - # max_execution_time (brutal) - execTTL: 60 diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index c4a7f72e..e8b4e83d 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -10,9 +10,11 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" "github.com/stretchr/testify/assert" ) @@ -21,15 +23,17 @@ func TestJobsInit(t *testing.T) { assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-kv-init.yaml", + Path: "configs/.rr-jobs-init.yaml", Prefix: "rr", } err = cont.RegisterAll( cfg, - &memory.Plugin{}, + &server.Plugin{}, &rpcPlugin.Plugin{}, &logger.ZapLogger{}, + &jobs.Plugin{}, + &ephemeral.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 61942516..5c26cbd0 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = &poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index f1c13734..e813e456 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -15,7 +15,7 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, |