diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/.rr.yaml | 73 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 8 | ||||
-rw-r--r-- | plugins/jobs/config.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 10 |
4 files changed, 24 insertions, 74 deletions
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml deleted file mode 100644 index 1b84515f..00000000 --- a/plugins/jobs/.rr.yaml +++ /dev/null @@ -1,73 +0,0 @@ -server: - command: "php worker.php" - -jobs: - # worker pool configuration - pool: - num_workers: 4 - - # rabbitmq and similar servers - amqp: - addr: amqp://guest:guest@localhost:5672/ - - # beanstalk configuration - beanstalk: - addr: tcp://localhost:11300 - - # amazon sqs configuration - sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - - # job destinations and options - dispatch: - spiral-jobs-tests-amqp-*.pipeline: amqp - spiral-jobs-tests-local-*.pipeline: local - spiral-jobs-tests-beanstalk-*.pipeline: beanstalk - spiral-jobs-tests-sqs-*.pipeline: sqs - - # list of broker pipelines associated with endpoints - pipelines: - local: - broker: ephemeral - - amqp: - broker: amqp - queue: default - - beanstalk: - broker: beanstalk - tube: default - - sqs: - broker: sqs - queue: default - declare: - MessageRetentionPeriod: 86400 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: ["local", "amqp", "beanstalk", "sqs"] - - -# monitors rr server(s) -limit: - # check worker state each second - interval: 1 - - # custom watch configuration for each service - services: - # monitor queue workers - jobs: - # maximum allowed memory consumption per worker (soft) - maxMemory: 100 - - # maximum time to live for the worker (soft) - TTL: 0 - - # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft) - idleTTL: 0 - - # max_execution_time (brutal) - execTTL: 60 diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 4d444c7b..df0d31be 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,6 +2,10 @@ package ephemeral import "github.com/spiral/roadrunner/v2/plugins/logger" +const ( + PluginName string = "ephemeral" +) + type Plugin struct { log logger.Logger } @@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error { p.log = log return nil } + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1e49b959..74e4a811 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -12,7 +12,7 @@ import ( type Config struct { // Workers configures roadrunner server and worker busy. // Workers *roadrunner.ServerConfig - poolCfg poolImpl.Config + poolCfg *poolImpl.Config // Dispatch defines where and how to match jobs. Dispatch map[string]*structs.Options @@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error { if err != nil { return errors.E(op, err) } + + if c.poolCfg != nil { + c.poolCfg.InitDefaults() + } + return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 072f872a..e7466efb 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + err = p.cfg.InitDefaults() + if err != nil { + return errors.E(op, err) + } + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) if err != nil { return errors.E(op, err) @@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + // initialize sub-plugins + // provide a queue to them + // start consume loop + // start resp loop + return errCh } |