diff options
author | Valery Piashchynski <[email protected]> | 2021-06-23 15:02:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-23 15:02:16 +0300 |
commit | 7fc09959619e9e400ecafcffcd63e38812f397a6 (patch) | |
tree | e6629f40cda53988facfb455ed460dbd05bbdc29 /plugins | |
parent | b0e7ac1aa40e9dbb688f88ac21b10a321a02c252 (diff) |
- Swithc from value to pointer to the pool configuration. Interface
value changed.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/.rr.yaml | 73 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 8 | ||||
-rw-r--r-- | plugins/jobs/config.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 10 | ||||
-rw-r--r-- | plugins/server/interface.go | 2 | ||||
-rw-r--r-- | plugins/server/plugin.go | 19 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 4 |
8 files changed, 39 insertions, 88 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index bec01ac3..fb174792 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -323,7 +323,7 @@ func (p *Plugin) Reset() error { p.pool = nil var err error - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml deleted file mode 100644 index 1b84515f..00000000 --- a/plugins/jobs/.rr.yaml +++ /dev/null @@ -1,73 +0,0 @@ -server: - command: "php worker.php" - -jobs: - # worker pool configuration - pool: - num_workers: 4 - - # rabbitmq and similar servers - amqp: - addr: amqp://guest:guest@localhost:5672/ - - # beanstalk configuration - beanstalk: - addr: tcp://localhost:11300 - - # amazon sqs configuration - sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - - # job destinations and options - dispatch: - spiral-jobs-tests-amqp-*.pipeline: amqp - spiral-jobs-tests-local-*.pipeline: local - spiral-jobs-tests-beanstalk-*.pipeline: beanstalk - spiral-jobs-tests-sqs-*.pipeline: sqs - - # list of broker pipelines associated with endpoints - pipelines: - local: - broker: ephemeral - - amqp: - broker: amqp - queue: default - - beanstalk: - broker: beanstalk - tube: default - - sqs: - broker: sqs - queue: default - declare: - MessageRetentionPeriod: 86400 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: ["local", "amqp", "beanstalk", "sqs"] - - -# monitors rr server(s) -limit: - # check worker state each second - interval: 1 - - # custom watch configuration for each service - services: - # monitor queue workers - jobs: - # maximum allowed memory consumption per worker (soft) - maxMemory: 100 - - # maximum time to live for the worker (soft) - TTL: 0 - - # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft) - idleTTL: 0 - - # max_execution_time (brutal) - execTTL: 60 diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 4d444c7b..df0d31be 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,6 +2,10 @@ package ephemeral import "github.com/spiral/roadrunner/v2/plugins/logger" +const ( + PluginName string = "ephemeral" +) + type Plugin struct { log logger.Logger } @@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error { p.log = log return nil } + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1e49b959..74e4a811 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -12,7 +12,7 @@ import ( type Config struct { // Workers configures roadrunner server and worker busy. // Workers *roadrunner.ServerConfig - poolCfg poolImpl.Config + poolCfg *poolImpl.Config // Dispatch defines where and how to match jobs. Dispatch map[string]*structs.Options @@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error { if err != nil { return errors.E(op, err) } + + if c.poolCfg != nil { + c.poolCfg.InitDefaults() + } + return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 072f872a..e7466efb 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + err = p.cfg.InitDefaults() + if err != nil { + return errors.E(op, err) + } + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) if err != nil { return errors.E(op, err) @@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + // initialize sub-plugins + // provide a queue to them + // start consume loop + // start resp loop + return errCh } diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 0424d52d..b0f84a7f 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -19,5 +19,5 @@ type Server interface { // NewWorker return a new worker with provided and attached by the user listeners and environment variables NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration - NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) + NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e2fa0086..42273ed7 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -21,14 +21,14 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -// PluginName for the server -const PluginName = "server" - -// RrRelay env variable key (internal) -const RrRelay = "RR_RELAY" - -// RrRPC env variable key (internal) if the RPC presents -const RrRPC = "RR_RPC" +const ( + // PluginName for the server + PluginName = "server" + // RrRelay env variable key (internal) + RrRelay = "RR_RELAY" + // RrRPC env variable key (internal) if the RPC presents + RrRPC = "RR_RPC" +) // Plugin manages worker type Plugin struct { @@ -140,8 +140,9 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { const op = errors.Op("server_plugin_new_worker_pool") + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index c9a31613..5925a588 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -273,7 +273,7 @@ func (p *Plugin) Reset() error { p.phpPool = nil var err error - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, |