summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
committerValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
commit05660fcd256963eac94ada90f7baa409344f9e73 (patch)
tree72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/drivers/beanstalk
parent182199a6449677a620813e3a8157cd0406095435 (diff)
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go44
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go9
4 files changed, 46 insertions, 13 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 62301bed..fc659902 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,7 +1,7 @@
package beanstalk
import (
- "strings"
+ "net"
"sync"
"time"
@@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
}
}
@@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
}
-
- return 0, nil, err
}
return id, body, nil
@@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return errN
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
}
-
- return err
}
return nil
}
@@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error {
return nil
}
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+var connErrors = map[string]struct{}{"EOF": {}}
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) {
+
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
- for _, errStr := range connErrors {
- if connErr, ok := err.(beanstalk.ConnError); ok {
- // if error is related to the broken connection - redial
- if strings.Contains(errStr, connErr.Err.Error()) {
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
@@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error {
}
}
- return nil
+ // return initial error
+ return err
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 1c2e9781..1490e587 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
func (j *JobConsumer) Stop() error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.stopCh <- struct{}{}
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ j.stopCh <- struct{}{}
+ }
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 2c2873c2..b797fc12 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -100,7 +100,7 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- return nil
+ return i.Options.conn.Delete(i.Options.id)
}
func fromJob(job *job.Job) *Item {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index ec0b5ca8..0f98312a 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,5 +1,7 @@
package beanstalk
+import "github.com/beanstalkd/go-beanstalk"
+
func (j *JobConsumer) listen() {
for {
select {
@@ -9,6 +11,13 @@ func (j *JobConsumer) listen() {
default:
id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
+ if errB, ok := err.(beanstalk.ConnError); ok {
+ switch errB.Err {
+ case beanstalk.ErrTimeout:
+ j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
+ continue
+ }
+ }
// in case of other error - continue
j.log.Error("beanstalk reserve", "error", err)
continue