summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/config.go24
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go92
-rw-r--r--plugins/jobs/drivers/amqp/item.go13
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go44
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go9
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go4
-rw-r--r--plugins/jobs/drivers/sqs/listener.go1
9 files changed, 122 insertions, 71 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 7befb3c8..2a1aed20 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -2,13 +2,15 @@ package amqp
// pipeline rabbitmq info
const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange-type"
- queue string = "queue"
- routingKey string = "routing-key"
- prefetch string = "prefetch"
- exclusive string = "exclusive"
- priority string = "priority"
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+ multipleAsk string = "multiple_ask"
+ requeueOnFail string = "requeue_on_fail"
dlx string = "x-dead-letter-exchange"
dlxRoutingKey string = "x-dead-letter-routing-key"
@@ -24,13 +26,15 @@ type GlobalCfg struct {
// Config is used to parse pipeline configuration
type Config struct {
- PrefetchCount int `mapstructure:"pipeline_size"`
+ Prefetch int `mapstructure:"prefetch"`
Queue string `mapstructure:"queue"`
Priority int64 `mapstructure:"priority"`
Exchange string `mapstructure:"exchange"`
ExchangeType string `mapstructure:"exchange_type"`
RoutingKey string `mapstructure:"routing_key"`
Exclusive bool `mapstructure:"exclusive"`
+ MultipleAck bool `mapstructure:"multiple_ask"`
+ RequeueOnFail bool `mapstructure:"requeue_on_fail"`
}
func (c *Config) InitDefault() {
@@ -42,8 +46,8 @@ func (c *Config) InitDefault() {
c.Exchange = "default"
}
- if c.PrefetchCount == 0 {
- c.PrefetchCount = 100
+ if c.Prefetch == 0 {
+ c.Prefetch = 100
}
if c.Priority == 0 {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 6def138e..d592a17a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -29,17 +29,19 @@ type JobsConsumer struct {
conn *amqp.Connection
consumeChan *amqp.Channel
publishChan *amqp.Channel
+ consumeID string
+ connStr string
retryTimeout time.Duration
- prefetchCount int
+ prefetch int
priority int64
exchangeName string
queue string
exclusive bool
- consumeID string
- connStr string
exchangeType string
routingKey string
+ multipleAck bool
+ requeueOnFail bool
delayCache map[string]struct{}
@@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
-
// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
@@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- // PARSE CONFIGURATION -------
+ // PARSE CONFIGURATION START -------
var pipeCfg Config
var globalCfg GlobalCfg
@@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
- jb.routingKey = pipeCfg.RoutingKey
- jb.queue = pipeCfg.Queue
- jb.exchangeType = pipeCfg.ExchangeType
- jb.exchangeName = pipeCfg.Exchange
- jb.prefetchCount = pipeCfg.PrefetchCount
- jb.exclusive = pipeCfg.Exclusive
- jb.priority = pipeCfg.Priority
-
- // PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ priority: pipeCfg.Priority,
+
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
@@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
// only global section
if !cfg.Has(pluginName) {
@@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
globalCfg.InitDefault()
- jb.routingKey = pipeline.String(routingKey, "")
- jb.queue = pipeline.String(queue, "default")
- jb.exchangeType = pipeline.String(exchangeType, "direct")
- jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
- jb.prefetchCount = pipeline.Int(prefetch, 10)
- jb.priority = int64(pipeline.Int(priority, 10))
- jb.exclusive = pipeline.Bool(exclusive, true)
-
// PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, true),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
return nil, errors.E(op, err)
@@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
@@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) {
return
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
j.log.Error("qos set failed", "error", err)
return
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b912dde..bc679037 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -50,6 +50,10 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ multipleAsk bool
+ requeue bool
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) {
}
func (j *Item) Ack() error {
- return j.AckFunc(false)
+ return j.AckFunc(j.Options.multipleAsk)
}
func (j *Item) Nack() error {
- return j.NackFunc(false, false)
+ return j.NackFunc(false, j.Options.requeue)
}
func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
@@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) {
// unpack restores jobs.Options
func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
- item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: j.multipleAck,
+ requeue: j.requeueOnFail,
+ }}
if _, ok := d.Headers[job.RRID].(string); !ok {
return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 62301bed..fc659902 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,7 +1,7 @@
package beanstalk
import (
- "strings"
+ "net"
"sync"
"time"
@@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
}
}
@@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
}
-
- return 0, nil, err
}
return id, body, nil
@@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return errN
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
}
-
- return err
}
return nil
}
@@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error {
return nil
}
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+var connErrors = map[string]struct{}{"EOF": {}}
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) {
+
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
- for _, errStr := range connErrors {
- if connErr, ok := err.(beanstalk.ConnError); ok {
- // if error is related to the broken connection - redial
- if strings.Contains(errStr, connErr.Err.Error()) {
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
@@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error {
}
}
- return nil
+ // return initial error
+ return err
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 1c2e9781..1490e587 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
func (j *JobConsumer) Stop() error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.stopCh <- struct{}{}
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ j.stopCh <- struct{}{}
+ }
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 2c2873c2..b797fc12 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -100,7 +100,7 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- return nil
+ return i.Options.conn.Delete(i.Options.id)
}
func fromJob(job *job.Job) *Item {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index ec0b5ca8..0f98312a 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,5 +1,7 @@
package beanstalk
+import "github.com/beanstalkd/go-beanstalk"
+
func (j *JobConsumer) listen() {
for {
select {
@@ -9,6 +11,13 @@ func (j *JobConsumer) listen() {
default:
id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
+ if errB, ok := err.(beanstalk.ConnError); ok {
+ switch errB.Err {
+ case beanstalk.ErrTimeout:
+ j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
+ continue
+ }
+ }
// in case of other error - continue
j.log.Error("beanstalk reserve", "error", err)
continue
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 18546715..43617716 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
@@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 5722c19a..8c5d887e 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit
for {
select {
case <-j.pauseCh:
+ j.log.Warn("sqs listener stopped")
return
default:
message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{