summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-23 15:02:16 +0300
committerValery Piashchynski <[email protected]>2021-06-23 15:02:16 +0300
commit7fc09959619e9e400ecafcffcd63e38812f397a6 (patch)
treee6629f40cda53988facfb455ed460dbd05bbdc29 /plugins/jobs
parentb0e7ac1aa40e9dbb688f88ac21b10a321a02c252 (diff)
- Swithc from value to pointer to the pool configuration. Interface
value changed. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/.rr.yaml73
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go8
-rw-r--r--plugins/jobs/config.go7
-rw-r--r--plugins/jobs/plugin.go10
4 files changed, 24 insertions, 74 deletions
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml
deleted file mode 100644
index 1b84515f..00000000
--- a/plugins/jobs/.rr.yaml
+++ /dev/null
@@ -1,73 +0,0 @@
-server:
- command: "php worker.php"
-
-jobs:
- # worker pool configuration
- pool:
- num_workers: 4
-
- # rabbitmq and similar servers
- amqp:
- addr: amqp://guest:guest@localhost:5672/
-
- # beanstalk configuration
- beanstalk:
- addr: tcp://localhost:11300
-
- # amazon sqs configuration
- sqs:
- key: api-key
- secret: api-secret
- region: us-west-1
- endpoint: http://localhost:9324
-
- # job destinations and options
- dispatch:
- spiral-jobs-tests-amqp-*.pipeline: amqp
- spiral-jobs-tests-local-*.pipeline: local
- spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
- spiral-jobs-tests-sqs-*.pipeline: sqs
-
- # list of broker pipelines associated with endpoints
- pipelines:
- local:
- broker: ephemeral
-
- amqp:
- broker: amqp
- queue: default
-
- beanstalk:
- broker: beanstalk
- tube: default
-
- sqs:
- broker: sqs
- queue: default
- declare:
- MessageRetentionPeriod: 86400
-
- # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: ["local", "amqp", "beanstalk", "sqs"]
-
-
-# monitors rr server(s)
-limit:
- # check worker state each second
- interval: 1
-
- # custom watch configuration for each service
- services:
- # monitor queue workers
- jobs:
- # maximum allowed memory consumption per worker (soft)
- maxMemory: 100
-
- # maximum time to live for the worker (soft)
- TTL: 0
-
- # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
- idleTTL: 0
-
- # max_execution_time (brutal)
- execTTL: 60
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 4d444c7b..df0d31be 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -2,6 +2,10 @@ package ephemeral
import "github.com/spiral/roadrunner/v2/plugins/logger"
+const (
+ PluginName string = "ephemeral"
+)
+
type Plugin struct {
log logger.Logger
}
@@ -10,3 +14,7 @@ func (p *Plugin) Init(log logger.Logger) error {
p.log = log
return nil
}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 1e49b959..74e4a811 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -12,7 +12,7 @@ import (
type Config struct {
// Workers configures roadrunner server and worker busy.
// Workers *roadrunner.ServerConfig
- poolCfg poolImpl.Config
+ poolCfg *poolImpl.Config
// Dispatch defines where and how to match jobs.
Dispatch map[string]*structs.Options
@@ -35,6 +35,11 @@ func (c *Config) InitDefaults() error {
if err != nil {
return errors.E(op, err)
}
+
+ if c.poolCfg != nil {
+ c.poolCfg.InitDefaults()
+ }
+
return nil
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 072f872a..e7466efb 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -45,6 +45,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
+ err = p.cfg.InitDefaults()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
if err != nil {
return errors.E(op, err)
@@ -60,6 +65,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ // initialize sub-plugins
+ // provide a queue to them
+ // start consume loop
+ // start resp loop
+
return errCh
}