summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/jobs/.rr.yaml73
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go8
-rw-r--r--plugins/jobs/config.go7
-rw-r--r--plugins/jobs/plugin.go10
-rw-r--r--plugins/server/interface.go2
-rw-r--r--plugins/server/plugin.go19
-rw-r--r--plugins/websockets/plugin.go4
8 files changed, 39 insertions, 88 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index bec01ac3..fb174792 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -323,7 +323,7 @@ func (p *Plugin) Reset() error {
p.pool = nil
var err error
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml
deleted file mode 100644
index 1b84515f..00000000
--- a/plugins/jobs/.rr.yaml
+++ /dev/null
@@ -1,73 +0,0 @@
-server:
- command: "php worker.php"
-
-jobs:
- # worker pool configuration
- pool:
- num_workers: 4
-
- # rabbitmq and similar servers
- amqp:
- addr: amqp://guest:guest@localhost:5672/
-
- # beanstalk configuration
- beanstalk:
- addr: tcp://localhost:11300
-
- # amazon sqs configuration
- sqs:
- key: api-key
- secret: api-secret
- region: us-west-1
- endpoint: http://localhost:9324
-
- # job destinations and options
- dispatch:
- spiral-jobs-tests-amqp-*.pipeline: amqp
- spiral-jobs-tests-local-*.pipeline: local
- spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
- spiral-jobs-tests-sqs-*.pipeline: sqs
-
- # list of broker pipelines associated with endpoints
- pipelines:
- local:
- broker: ephemeral
-
- amqp:
- broker: amqp
- queue: default
-
- beanstalk:
- broker: beanstalk
- tube: default
-
- sqs:
- broker: sqs
- queue: default
- declare:
- MessageRetentionPeriod: 86400
-
- # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: ["local", "amqp", "beanstalk", "sqs"]
-
-
-# monitors rr server(s)
-limit:
- # check worker state each second
- interval: 1
-
- # custom watch configuration for each service
- services:
- # monitor queue workers
- jobs:
- # maximum allowed memory consumption per worker (soft)
- maxMemory: 100
-
- # maximum time to live for the worker (soft)
- TTL: 0
-
- # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
- idleTTL: 0
-
- # max_execution_time (brutal)
- execTTL: 60
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 4d444c7b..df0d31be 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -2,6 +2,10 @@ package ephemeral
import "github.com/spiral/roadrunner/v2/plugins/logger"
+const (
+ PluginName string = "ephemeral"
+)
+
type Plugin struct {
log logger.Logger
}
@@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error {
p.log = log
return nil
}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 1e49b959..74e4a811 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -12,7 +12,7 @@ import (
type Config struct {
// Workers configures roadrunner server and worker busy.
// Workers *roadrunner.ServerConfig
- poolCfg poolImpl.Config
+ poolCfg *poolImpl.Config
// Dispatch defines where and how to match jobs.
Dispatch map[string]*structs.Options
@@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error {
if err != nil {
return errors.E(op, err)
}
+
+ if c.poolCfg != nil {
+ c.poolCfg.InitDefaults()
+ }
+
return nil
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 072f872a..e7466efb 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
+ err = p.cfg.InitDefaults()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
if err != nil {
return errors.E(op, err)
@@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ // initialize sub-plugins
+ // provide a queue to them
+ // start consume loop
+ // start resp loop
+
return errCh
}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 0424d52d..b0f84a7f 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -19,5 +19,5 @@ type Server interface {
// NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
// NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
- NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
+ NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e2fa0086..42273ed7 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -21,14 +21,14 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-// PluginName for the server
-const PluginName = "server"
-
-// RrRelay env variable key (internal)
-const RrRelay = "RR_RELAY"
-
-// RrRPC env variable key (internal) if the RPC presents
-const RrRPC = "RR_RPC"
+const (
+ // PluginName for the server
+ PluginName = "server"
+ // RrRelay env variable key (internal)
+ RrRelay = "RR_RELAY"
+ // RrRPC env variable key (internal) if the RPC presents
+ RrRPC = "RR_RPC"
+)
// Plugin manages worker
type Plugin struct {
@@ -140,8 +140,9 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
const op = errors.Op("server_plugin_new_worker_pool")
+
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index c9a31613..5925a588 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -273,7 +273,7 @@ func (p *Plugin) Reset() error {
p.phpPool = nil
var err error
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,