diff options
author | Valery Piashchynski <[email protected]> | 2021-07-12 08:47:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-12 08:47:33 +0300 |
commit | e82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (patch) | |
tree | 098a9827f51255916f99160b02098153f8d0238e /plugins | |
parent | 0f70f1e2311640236d74a0a237536779d8d44223 (diff) |
Finish dynamic declaration of the pipelines. Fix issue with
configuration parsing in the AMQP consumer.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 79 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 26 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 11 |
6 files changed, 106 insertions, 20 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 22eee2dc..a7916f7e 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -21,6 +21,7 @@ const ( exchangeType string = "exchange-type" queue string = "queue" routingKey string = "routing-key" + prefetch string = "prefetch" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -76,10 +77,11 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - log: log, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), + log: log, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config retryTimeout: time.Minute * 5, delayCache: make(map[string]struct{}, 100), } @@ -105,7 +107,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pipeCfg.InitDefault() - err = cfg.UnmarshalKey(configKey, &globalCfg) + err = cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) } @@ -125,7 +127,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, err) } - // assign address + // save address jb.connStr = globalCfg.Addr err = jb.initRabbitMQ() @@ -144,12 +146,65 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) { - _ = exchangeType - _ = exchangeKey - _ = queue - _ = routingKey - panic("not implemented") +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer_from_pipeline") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeline.String(routingKey, "") + jb.queue = pipeline.String(queue, "default") + jb.exchangeType = pipeline.String(exchangeType, "direct") + jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") + jb.prefetchCount = pipeline.Int(prefetch, 10) + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // run redialer for the connection + jb.redialer() + + return jb, nil } func (j *JobsConsumer) Push(job *structs.Job) error { diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 6743dc2f..ca972c5b 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -35,5 +35,5 @@ func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.C // FromPipeline constructs AMQP driver from pipeline func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, pq) + return FromPipeline(pipe, p.log, p.cfg, pq) } diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 16071b78..277e75b7 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -39,7 +39,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit // re-init connection errInit := j.initRabbitMQ() if errInit != nil { - j.log.Error("error while redialing", "error", errInit) + j.log.Error("rabbitmq dial", "error", errInit) return errInit } @@ -74,7 +74,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit // restart listener j.listener(deliv) - j.log.Info("queues and subscribers redeclare succeed") + j.log.Info("queues and subscribers redeclared successfully") return nil } diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 9d79221c..b51af322 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -12,6 +12,10 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) +const ( + pipelineSize string = "pipeline_size" +) + type Config struct { PipelineSize uint64 `mapstructure:"pipeline_size"` } @@ -26,12 +30,12 @@ type JobBroker struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ log: log, - pq: q, + pq: pq, stopCh: make(chan struct{}, 1), } @@ -53,8 +57,22 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q return jb, nil } -func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) { - panic("not implemented") +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, pq priorityqueue.Queue) (*JobBroker, error) { + jb := &JobBroker{ + log: log, + pq: pq, + stopCh: make(chan struct{}, 1), + } + + jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000)) + + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + + return jb, nil } func (j *JobBroker) Push(job *structs.Job) error { diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 75012873..bfe2d6ac 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -29,10 +29,12 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} +// JobsConstruct creates new ephemeral consumer from the configuration func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(configKey, p.log, p.cfg, pq) } +// FromPipeline creates new ephemeral consumer from the provided pipeline func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, pq) + return FromPipeline(pipeline, p.log, pq) } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index e87204f9..91898178 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -44,6 +44,17 @@ func (p Pipeline) String(name string, d string) string { return d } +// Int must return option value as string or return default value. +func (p Pipeline) Int(name string, d int) int { + if value, ok := p[name]; ok { + if i, ok := value.(int); ok { + return i + } + } + + return d +} + // Priority returns default pipeline priority func (p Pipeline) Priority() uint64 { if value, ok := p[priority]; ok { |