diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 92 |
1 files changed, 49 insertions, 43 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 6def138e..d592a17a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -29,17 +29,19 @@ type JobsConsumer struct { conn *amqp.Connection consumeChan *amqp.Channel publishChan *amqp.Channel + consumeID string + connStr string retryTimeout time.Duration - prefetchCount int + prefetch int priority int64 exchangeName string queue string exclusive bool - consumeID string - connStr string exchangeType string routingKey string + multipleAck bool + requeueOnFail bool delayCache map[string]struct{} @@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } - // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) @@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - // PARSE CONFIGURATION ------- + // PARSE CONFIGURATION START ------- var pipeCfg Config var globalCfg GlobalCfg @@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- - jb.routingKey = pipeCfg.RoutingKey - jb.queue = pipeCfg.Queue - jb.exchangeType = pipeCfg.ExchangeType - jb.exchangeName = pipeCfg.Exchange - jb.prefetchCount = pipeCfg.PrefetchCount - jb.exclusive = pipeCfg.Exclusive - jb.priority = pipeCfg.Priority - - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + priority: pipeCfg.Priority, + + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { @@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } // only global section if !cfg.Has(pluginName) { @@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con globalCfg.InitDefault() - jb.routingKey = pipeline.String(routingKey, "") - jb.queue = pipeline.String(queue, "default") - jb.exchangeType = pipeline.String(exchangeType, "direct") - jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") - jb.prefetchCount = pipeline.Int(prefetch, 10) - jb.priority = int64(pipeline.Int(priority, 10)) - jb.exclusive = pipeline.Bool(exclusive, true) - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, true), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { return nil, errors.E(op, err) @@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { return errors.E(op, err) } @@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) { return } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { j.log.Error("qos set failed", "error", err) return |