summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-12 08:47:33 +0300
committerValery Piashchynski <[email protected]>2021-07-12 08:47:33 +0300
commite82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (patch)
tree098a9827f51255916f99160b02098153f8d0238e /plugins
parent0f70f1e2311640236d74a0a237536779d8d44223 (diff)
Finish dynamic declaration of the pipelines. Fix issue with
configuration parsing in the AMQP consumer. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go79
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go2
-rw-r--r--plugins/jobs/brokers/amqp/redial.go4
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go26
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go4
-rw-r--r--plugins/jobs/pipeline/pipeline.go11
6 files changed, 106 insertions, 20 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 22eee2dc..a7916f7e 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -21,6 +21,7 @@ const (
exchangeType string = "exchange-type"
queue string = "queue"
routingKey string = "routing-key"
+ prefetch string = "prefetch"
dlx string = "x-dead-letter-exchange"
dlxRoutingKey string = "x-dead-letter-routing-key"
@@ -76,10 +77,11 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
jb := &JobsConsumer{
- log: log,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
+ log: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
retryTimeout: time.Minute * 5,
delayCache: make(map[string]struct{}, 100),
}
@@ -105,7 +107,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
pipeCfg.InitDefault()
- err = cfg.UnmarshalKey(configKey, &globalCfg)
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
}
@@ -125,7 +127,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, err)
}
- // assign address
+ // save address
jb.connStr = globalCfg.Addr
err = jb.initRabbitMQ()
@@ -144,12 +146,65 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) {
- _ = exchangeType
- _ = exchangeKey
- _ = queue
- _ = routingKey
- panic("not implemented")
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) {
+ const op = errors.Op("new_amqp_consumer_from_pipeline")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeline.String(routingKey, "")
+ jb.queue = pipeline.String(queue, "default")
+ jb.exchangeType = pipeline.String(exchangeType, "direct")
+ jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
+ jb.prefetchCount = pipeline.Int(prefetch, 10)
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
}
func (j *JobsConsumer) Push(job *structs.Job) error {
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 6743dc2f..ca972c5b 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -35,5 +35,5 @@ func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.C
// FromPipeline constructs AMQP driver from pipeline
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, pq)
+ return FromPipeline(pipe, p.log, p.cfg, pq)
}
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
index 16071b78..277e75b7 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -39,7 +39,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
// re-init connection
errInit := j.initRabbitMQ()
if errInit != nil {
- j.log.Error("error while redialing", "error", errInit)
+ j.log.Error("rabbitmq dial", "error", errInit)
return errInit
}
@@ -74,7 +74,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
// restart listener
j.listener(deliv)
- j.log.Info("queues and subscribers redeclare succeed")
+ j.log.Info("queues and subscribers redeclared successfully")
return nil
}
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 9d79221c..b51af322 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -12,6 +12,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+const (
+ pipelineSize string = "pipeline_size"
+)
+
type Config struct {
PipelineSize uint64 `mapstructure:"pipeline_size"`
}
@@ -26,12 +30,12 @@ type JobBroker struct {
stopCh chan struct{}
}
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobBroker, error) {
const op = errors.Op("new_ephemeral_pipeline")
jb := &JobBroker{
log: log,
- pq: q,
+ pq: pq,
stopCh: make(chan struct{}, 1),
}
@@ -53,8 +57,22 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q
return jb, nil
}
-func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) {
- panic("not implemented")
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, pq priorityqueue.Queue) (*JobBroker, error) {
+ jb := &JobBroker{
+ log: log,
+ pq: pq,
+ stopCh: make(chan struct{}, 1),
+ }
+
+ jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000))
+
+ // initialize a local queue
+ jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+
+ // consume from the queue
+ go jb.consume()
+
+ return jb, nil
}
func (j *JobBroker) Push(job *structs.Job) error {
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 75012873..bfe2d6ac 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -29,10 +29,12 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
+// JobsConstruct creates new ephemeral consumer from the configuration
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
return NewJobBroker(configKey, p.log, p.cfg, pq)
}
+// FromPipeline creates new ephemeral consumer from the provided pipeline
func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipeline, pq)
+ return FromPipeline(pipeline, p.log, pq)
}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index e87204f9..91898178 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -44,6 +44,17 @@ func (p Pipeline) String(name string, d string) string {
return d
}
+// Int must return option value as string or return default value.
+func (p Pipeline) Int(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(int); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
// Priority returns default pipeline priority
func (p Pipeline) Priority() uint64 {
if value, ok := p[priority]; ok {