diff options
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 92 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 13 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 44 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 1 |
9 files changed, 122 insertions, 71 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 7befb3c8..2a1aed20 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -2,13 +2,15 @@ package amqp // pipeline rabbitmq info const ( - exchangeKey string = "exchange" - exchangeType string = "exchange-type" - queue string = "queue" - routingKey string = "routing-key" - prefetch string = "prefetch" - exclusive string = "exclusive" - priority string = "priority" + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + multipleAsk string = "multiple_ask" + requeueOnFail string = "requeue_on_fail" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -24,13 +26,15 @@ type GlobalCfg struct { // Config is used to parse pipeline configuration type Config struct { - PrefetchCount int `mapstructure:"pipeline_size"` + Prefetch int `mapstructure:"prefetch"` Queue string `mapstructure:"queue"` Priority int64 `mapstructure:"priority"` Exchange string `mapstructure:"exchange"` ExchangeType string `mapstructure:"exchange_type"` RoutingKey string `mapstructure:"routing_key"` Exclusive bool `mapstructure:"exclusive"` + MultipleAck bool `mapstructure:"multiple_ask"` + RequeueOnFail bool `mapstructure:"requeue_on_fail"` } func (c *Config) InitDefault() { @@ -42,8 +46,8 @@ func (c *Config) InitDefault() { c.Exchange = "default" } - if c.PrefetchCount == 0 { - c.PrefetchCount = 100 + if c.Prefetch == 0 { + c.Prefetch = 100 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 6def138e..d592a17a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -29,17 +29,19 @@ type JobsConsumer struct { conn *amqp.Connection consumeChan *amqp.Channel publishChan *amqp.Channel + consumeID string + connStr string retryTimeout time.Duration - prefetchCount int + prefetch int priority int64 exchangeName string queue string exclusive bool - consumeID string - connStr string exchangeType string routingKey string + multipleAck bool + requeueOnFail bool delayCache map[string]struct{} @@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } - // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) @@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - // PARSE CONFIGURATION ------- + // PARSE CONFIGURATION START ------- var pipeCfg Config var globalCfg GlobalCfg @@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- - jb.routingKey = pipeCfg.RoutingKey - jb.queue = pipeCfg.Queue - jb.exchangeType = pipeCfg.ExchangeType - jb.exchangeName = pipeCfg.Exchange - jb.prefetchCount = pipeCfg.PrefetchCount - jb.exclusive = pipeCfg.Exclusive - jb.priority = pipeCfg.Priority - - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + priority: pipeCfg.Priority, + + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { @@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } // only global section if !cfg.Has(pluginName) { @@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con globalCfg.InitDefault() - jb.routingKey = pipeline.String(routingKey, "") - jb.queue = pipeline.String(queue, "default") - jb.exchangeType = pipeline.String(exchangeType, "direct") - jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") - jb.prefetchCount = pipeline.Int(prefetch, 10) - jb.priority = int64(pipeline.Int(priority, 10)) - jb.exclusive = pipeline.Bool(exclusive, true) - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, true), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { return nil, errors.E(op, err) @@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { return errors.E(op, err) } @@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) { return } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { j.log.Error("qos set failed", "error", err) return diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b912dde..bc679037 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -50,6 +50,10 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + multipleAsk bool + requeue bool } // DelayDuration returns delay duration in a form of time.Duration. @@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) { } func (j *Item) Ack() error { - return j.AckFunc(false) + return j.AckFunc(j.Options.multipleAsk) } func (j *Item) Nack() error { - return j.NackFunc(false, false) + return j.NackFunc(false, j.Options.requeue) } func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { @@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) { // unpack restores jobs.Options func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + }} if _, ok := d.Headers[job.RRID].(string); !ok { return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 62301bed..fc659902 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,7 +1,7 @@ package beanstalk import ( - "strings" + "net" "sync" "time" @@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint errN := cp.checkAndRedial(err) if errN != nil { return 0, errN + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) } } @@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) } - - return 0, nil, err } return id, body, nil @@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error { err := cp.conn.Delete(id) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return errN + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) } - - return err } return nil } @@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error { return nil } -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} +var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { + + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } - for _, errStr := range connErrors { - if connErr, ok := err.(beanstalk.ConnError); ok { - // if error is related to the broken connection - redial - if strings.Contains(errStr, connErr.Err.Error()) { + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial cp.RUnlock() errR := cp.redial() cp.RLock() @@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error { } } - return nil + // return initial error + return err } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1c2e9781..1490e587 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.stopCh <- struct{}{} + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 2c2873c2..b797fc12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -100,7 +100,7 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - return nil + return i.Options.conn.Delete(i.Options.id) } func fromJob(job *job.Job) *Item { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index ec0b5ca8..0f98312a 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,5 +1,7 @@ package beanstalk +import "github.com/beanstalkd/go-beanstalk" + func (j *JobConsumer) listen() { for { select { @@ -9,6 +11,13 @@ func (j *JobConsumer) listen() { default: id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } // in case of other error - continue j.log.Error("beanstalk reserve", "error", err) continue diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 18546715..43617716 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- @@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 5722c19a..8c5d887e 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit for { select { case <-j.pauseCh: + j.log.Warn("sqs listener stopped") return default: message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ |