diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 92 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 13 |
3 files changed, 73 insertions, 56 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 7befb3c8..2a1aed20 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -2,13 +2,15 @@ package amqp // pipeline rabbitmq info const ( - exchangeKey string = "exchange" - exchangeType string = "exchange-type" - queue string = "queue" - routingKey string = "routing-key" - prefetch string = "prefetch" - exclusive string = "exclusive" - priority string = "priority" + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + multipleAsk string = "multiple_ask" + requeueOnFail string = "requeue_on_fail" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -24,13 +26,15 @@ type GlobalCfg struct { // Config is used to parse pipeline configuration type Config struct { - PrefetchCount int `mapstructure:"pipeline_size"` + Prefetch int `mapstructure:"prefetch"` Queue string `mapstructure:"queue"` Priority int64 `mapstructure:"priority"` Exchange string `mapstructure:"exchange"` ExchangeType string `mapstructure:"exchange_type"` RoutingKey string `mapstructure:"routing_key"` Exclusive bool `mapstructure:"exclusive"` + MultipleAck bool `mapstructure:"multiple_ask"` + RequeueOnFail bool `mapstructure:"requeue_on_fail"` } func (c *Config) InitDefault() { @@ -42,8 +46,8 @@ func (c *Config) InitDefault() { c.Exchange = "default" } - if c.PrefetchCount == 0 { - c.PrefetchCount = 100 + if c.Prefetch == 0 { + c.Prefetch = 100 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 6def138e..d592a17a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -29,17 +29,19 @@ type JobsConsumer struct { conn *amqp.Connection consumeChan *amqp.Channel publishChan *amqp.Channel + consumeID string + connStr string retryTimeout time.Duration - prefetchCount int + prefetch int priority int64 exchangeName string queue string exclusive bool - consumeID string - connStr string exchangeType string routingKey string + multipleAck bool + requeueOnFail bool delayCache map[string]struct{} @@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } - // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) @@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - // PARSE CONFIGURATION ------- + // PARSE CONFIGURATION START ------- var pipeCfg Config var globalCfg GlobalCfg @@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- - jb.routingKey = pipeCfg.RoutingKey - jb.queue = pipeCfg.Queue - jb.exchangeType = pipeCfg.ExchangeType - jb.exchangeName = pipeCfg.Exchange - jb.prefetchCount = pipeCfg.PrefetchCount - jb.exclusive = pipeCfg.Exclusive - jb.priority = pipeCfg.Priority - - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + priority: pipeCfg.Priority, + + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { @@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } // only global section if !cfg.Has(pluginName) { @@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con globalCfg.InitDefault() - jb.routingKey = pipeline.String(routingKey, "") - jb.queue = pipeline.String(queue, "default") - jb.exchangeType = pipeline.String(exchangeType, "direct") - jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") - jb.prefetchCount = pipeline.Int(prefetch, 10) - jb.priority = int64(pipeline.Int(priority, 10)) - jb.exclusive = pipeline.Bool(exclusive, true) - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, true), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { return nil, errors.E(op, err) @@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { return errors.E(op, err) } @@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) { return } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { j.log.Error("qos set failed", "error", err) return diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b912dde..bc679037 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -50,6 +50,10 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + multipleAsk bool + requeue bool } // DelayDuration returns delay duration in a form of time.Duration. @@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) { } func (j *Item) Ack() error { - return j.AckFunc(false) + return j.AckFunc(j.Options.multipleAsk) } func (j *Item) Nack() error { - return j.NackFunc(false, false) + return j.NackFunc(false, j.Options.requeue) } func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { @@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) { // unpack restores jobs.Options func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + }} if _, ok := d.Headers[job.RRID].(string); !ok { return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) |