summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
committerValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
commitb2da831f47284974551710d2767a7bdde0efa51d (patch)
tree7d8fee59cdb307110d2fcd872635437e0203321b
parent50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff)
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation. Complete redia for the beanstalk. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod1
-rw-r--r--plugins/jobs/drivers/amqp/item.go16
-rw-r--r--plugins/jobs/drivers/amqp/redial.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go100
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go77
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go17
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go56
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go34
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go8
-rw-r--r--plugins/jobs/drivers/sqs/config.go8
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
-rw-r--r--plugins/jobs/drivers/sqs/item.go17
-rw-r--r--plugins/jobs/drivers/sqs/listener.go41
-rw-r--r--plugins/jobs/plugin.go23
-rw-r--r--tests/env/Dockerfile-beanstalkd.yaml12
-rw-r--r--tests/env/docker-compose.yaml4
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml100
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go4
19 files changed, 425 insertions, 142 deletions
diff --git a/go.mod b/go.mod
index 273796fe..b518c14a 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.4.1
github.com/aws/aws-sdk-go-v2/credentials v1.3.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0
+ github.com/aws/smithy-go v1.5.0
github.com/beanstalkd/go-beanstalk v0.1.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cenkalti/backoff/v4 v4.1.1
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 7c300c88..6b912dde 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -82,7 +82,21 @@ func (j *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the amqp, amqp.Table used instead
func (j *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (j *Item) Ack() error {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index 4f04484e..532aadb4 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -38,7 +38,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
expb.MaxElapsedTime = j.retryTimeout
- op := func() error {
+ operation := func() error {
j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
j.conn, dialErr = amqp.Dial(j.connStr)
@@ -90,7 +90,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
return nil
}
- retryErr := backoff.Retry(op, expb)
+ retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
j.Unlock()
j.log.Error("backoff failed", "error", retryErr)
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index f05ee122..6a8bda1d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -30,7 +30,7 @@ func (c *Config) InitDefault() {
}
if c.ReserveTimeout == 0 {
- c.ReserveTimeout = time.Second * 5
+ c.ReserveTimeout = time.Second * 1
}
if c.PipePriority == 0 {
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
new file mode 100644
index 00000000..fd7a3902
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -0,0 +1,100 @@
+package beanstalk
+
+import (
+ "sync"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/errors"
+)
+
+type ConnPool struct {
+ sync.RWMutex
+ conn *beanstalk.Conn
+ connT *beanstalk.Conn
+ ts *beanstalk.TubeSet
+ t *beanstalk.Tube
+
+ network string
+ address string
+ tName string
+ tout time.Duration
+}
+
+func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
+ connT, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ connTS, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ tube := beanstalk.NewTube(connT, tName)
+ ts := beanstalk.NewTubeSet(connTS, tName)
+
+ return &ConnPool{
+ network: network,
+ address: address,
+ tName: tName,
+ tout: tout,
+ conn: connTS,
+ connT: connT,
+ ts: ts,
+ t: tube,
+ }, nil
+}
+
+func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.t.Put(body, pri, delay, ttr)
+}
+
+// Reserve reserves and returns a job from one of the tubes in t. If no
+// job is available before time timeout has passed, Reserve returns a
+// ConnError recording ErrTimeout.
+//
+// Typically, a client will reserve a job, perform some work, then delete
+// the job with Conn.Delete.
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.ts.Reserve(reserveTimeout)
+}
+
+func (cp *ConnPool) Delete(id uint64) error {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.conn.Delete(id)
+}
+
+func (cp *ConnPool) Redial() error {
+ const op = errors.Op("connection_pool_redial")
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.Lock()
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+ cp.Unlock()
+ return nil
+}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 27d453f4..cce85c99 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,11 +3,9 @@ package beanstalk
import (
"bytes"
"strings"
- "sync"
"sync/atomic"
"time"
- "github.com/beanstalkd/go-beanstalk"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -18,7 +16,6 @@ import (
)
type JobConsumer struct {
- sync.Mutex
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -27,11 +24,9 @@ type JobConsumer struct {
listeners uint32
// beanstalk
+ pool *ConnPool
addr string
network string
- conn *beanstalk.Conn
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
reserveTimeout time.Duration
reconnectCh chan struct{}
tout time.Duration
@@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
// initialize job consumer
jc := &JobConsumer{
pq: pq,
log: log,
eh: e,
+ pool: cPool,
network: dsn[0],
addr: dsn[1],
tout: globalCfg.Timeout,
- reserveTimeout: pipeCfg.ReserveTimeout,
tName: pipeCfg.Tube,
+ reserveTimeout: pipeCfg.ReserveTimeout,
tubePriority: pipeCfg.TubePriority,
priority: pipeCfg.PipePriority,
@@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}),
}
- jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout)
- if err != nil {
- return nil, err
- }
-
- jc.tube = beanstalk.NewTube(jc.conn, jc.tName)
- jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName)
-
// start redial listener
go jc.redial()
@@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
globalCfg.InitDefault()
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, ""),
- reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- stopCh: make(chan struct{}),
- reconnectCh: make(chan struct{}),
- }
-
// PARSE CONFIGURATION -------
dsn := strings.Split(globalCfg.Addr, "://")
@@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, "default"),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ tubePriority: uint32(pipe.Int(tube, 10)),
+ priority: pipe.Priority(),
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ reconnectCh: make(chan struct{}, 2),
}
// start redial listener
@@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- // reconnect protection
- j.Lock()
- defer j.Unlock()
-
item := fromJob(jb)
bb := new(bytes.Buffer)
@@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
- errD := j.conn.Delete(id)
+ // TODO check for the connection error
+ errD := j.pool.Delete(id)
if errD != nil {
return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
}
@@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- j.Lock()
- defer j.Unlock()
-
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 329d4c8d..2c2873c2 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/beanstalkd/go-beanstalk"
+ json "github.com/json-iterator/go"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -77,7 +78,21 @@ func (i *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
func (i *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (i *Item) Ack() error {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index 873930d5..33dd4fe5 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,35 +1,69 @@
package beanstalk
-func (j *JobConsumer) listen() {
+import (
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+)
+
+func (j *JobConsumer) listen() { //nolint:gocognit
+ const op = errors.Op("beanstalk_listen")
for {
select {
case <-j.stopCh:
j.log.Warn("beanstalk listener stopped")
return
-
default:
- // lock used here to prevent consume from the broken connection
- j.Lock()
-
- id, body, err := j.tubeSet.Reserve(j.reserveTimeout)
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
+ // reserve timeout
+ if connErr, ok := err.(beanstalk.ConnError); ok {
+ switch connErr.Err {
+ case beanstalk.ErrTimeout:
+ j.log.Warn("timeout expired", "warn", connErr.Error())
+ continue
+ default:
+ j.log.Error("beanstalk connection error", "error", connErr.Error())
+
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ operation := func() error {
+ errR := j.pool.Redial()
+ if errR != nil {
+ return errors.E(op, errR)
+ }
+
+ j.log.Info("beanstalk redial was successful")
+ // reassign a pool
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr)
+ return
+ }
+ continue
+ }
+ }
j.log.Error("beanstalk reserve", "error", err)
- j.Unlock()
continue
}
item := &Item{}
- err = unpack(id, body, j.conn, item)
+ err = unpack(id, body, j.pool.conn, item)
if err != nil {
j.log.Error("beanstalk unpack item", "error", err)
- j.Unlock()
continue
}
// insert job into the priority queue
j.pq.Insert(item)
-
- j.Unlock()
}
}
}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
index e1922517..784337ad 100644
--- a/plugins/jobs/drivers/beanstalk/redial.go
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -2,30 +2,40 @@ package beanstalk
import (
"sync/atomic"
+ "time"
- "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
)
func (j *JobConsumer) redial() {
for range j.reconnectCh {
// backoff here
-
- j.Lock()
-
- var err error
- j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout)
- if err != nil {
- panic(err)
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ op := func() error {
+ err := j.pool.Redial()
+ if err != nil {
+ return err
+ }
+
+ j.log.Info("beanstalk redial was successful")
+ // reassign a pool
+ return nil
}
- j.tube = beanstalk.NewTube(j.conn, j.tName)
- j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName)
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.log.Error("beanstalk backoff failed", "error", retryErr)
+ continue
+ }
// restart listener
if atomic.LoadUint32(&j.listeners) == 1 {
+ // stop previous listener
+ j.stopCh <- struct{}{}
go j.listen()
}
-
- j.Unlock()
}
}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index cd08ca7a..c49a23c1 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -166,7 +166,13 @@ func (j *JobBroker) Resume(pipeline string) {
}
// Run is no-op for the ephemeral
-func (j *JobBroker) Run(_ *pipeline.Pipeline) error {
+func (j *JobBroker) Run(pipe *pipeline.Pipeline) error {
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
index 0b4e8157..af5b1cfb 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -1,5 +1,7 @@
package sqs
+import "github.com/aws/aws-sdk-go-v2/aws"
+
type GlobalCfg struct {
Key string `mapstructure:"key"`
Secret string `mapstructure:"secret"`
@@ -36,7 +38,7 @@ type Config struct {
// Queue URLs and names are case-sensitive.
//
// This member is required.
- Queue string `mapstructure:"queue"`
+ Queue *string `mapstructure:"queue"`
// A map of attributes with their corresponding values. The following lists the
// names, descriptions, and values of the special request parameters that the
@@ -81,8 +83,8 @@ func (c *GlobalCfg) InitDefault() {
}
func (c *Config) InitDefault() {
- if c.Queue == "" {
- c.Queue = "default"
+ if c.Queue == nil {
+ c.Queue = aws.String("default")
}
if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 7e1f6d56..18546715 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -34,7 +34,7 @@ type JobConsumer struct {
sessionToken string
region string
endpoint string
- queue string
+ queue *string
messageGroupID string
waitTime int32
prefetch int32
@@ -47,8 +47,8 @@ type JobConsumer struct {
attributes map[string]string
tags map[string]string
- client *sqs.Client
- outputQ *sqs.CreateQueueOutput
+ client *sqs.Client
+ queueURL *string
pauseCh chan struct{}
}
@@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
messageGroupID: uuid.NewString(),
attributes: attr,
tags: tg,
- queue: pipe.String(queue, "default"),
+ queue: aws.String(pipe.String(queue, "default")),
prefetch: int32(pipe.Int(pref, 10)),
visibilityTimeout: int32(pipe.Int(visibility, 0)),
waitTime: int32(pipe.Int(waitTime, 0)),
@@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL))
if err != nil {
return errors.E(op, err)
}
@@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
// start listener
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 0f03cd20..9dd0aa5f 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -8,6 +8,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ json "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
@@ -108,7 +109,21 @@ func (i *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
func (i *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (i *Item) Ack() error {
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index ded79ae7..5722c19a 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -2,40 +2,71 @@ package sqs
import (
"context"
+ "time"
+ "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/aws/smithy-go"
)
const (
+ // All - get all message attribute names
All string = "All"
+
+ // NonExistentQueue AWS error code
+ NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen() {
+func (j *JobConsumer) listen() { //nolint:gocognit
for {
select {
case <-j.pauseCh:
return
default:
message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
MaxNumberOfMessages: j.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
VisibilityTimeout: j.visibilityTimeout,
WaitTimeSeconds: j.waitTime,
})
+
if err != nil {
+ if oErr, ok := (err).(*smithy.OperationError); ok {
+ if rErr, ok := oErr.Err.(*http.ResponseError); ok {
+ if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
+ // in case of NonExistentQueue - recreate the queue
+ if apiErr.Code == NonExistentQueue {
+ j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ if err != nil {
+ j.log.Error("create queue", "error", err)
+ }
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+ continue
+ }
+ }
+ }
+ }
+
j.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := unpack(&m, j.outputQ.QueueUrl, j.client)
+ item, err := unpack(&m, j.queueURL, j.client)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
@@ -52,7 +83,7 @@ func (j *JobConsumer) listen() {
}
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 98e7ebf8..47d31d99 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -20,6 +20,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -194,11 +195,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return
default:
// get data JOB from the queue
- job := p.queue.ExtractMin()
+ jb := p.queue.ExtractMin()
- ctx, err := job.Context()
+ ctx, err := jb.Context()
if err != nil {
- errNack := job.Nack()
+ errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
@@ -208,14 +209,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
exec := payload.Payload{
Context: ctx,
- Body: job.Body(),
+ Body: jb.Body(),
}
// protect from the pool reset
p.RLock()
- _, err = p.workersPool.Exec(exec)
+
+ // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+
+ resp, err := p.workersPool.Exec(exec)
if err != nil {
- errNack := job.Nack()
+ errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
@@ -226,9 +231,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
p.RUnlock()
- errAck := job.Ack()
+ // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
+
+ errAck := jb.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
+ continue
}
// TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
atomic.AddUint64(&rate, 1)
diff --git a/tests/env/Dockerfile-beanstalkd.yaml b/tests/env/Dockerfile-beanstalkd.yaml
new file mode 100644
index 00000000..cb81aafa
--- /dev/null
+++ b/tests/env/Dockerfile-beanstalkd.yaml
@@ -0,0 +1,12 @@
+FROM ubuntu:latest
+
+RUN apt-get update && apt-get install -y curl build-essential
+
+RUN curl -sL https://github.com/kr/beanstalkd/archive/v1.12.tar.gz | tar xvz -C /tmp
+
+WORKDIR /tmp/beanstalkd-1.12
+RUN make
+RUN cp beanstalkd /usr/bin
+
+EXPOSE 11300
+ENTRYPOINT ["/usr/bin/beanstalkd"]
diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml
index a407fed4..67a927c4 100644
--- a/tests/env/docker-compose.yaml
+++ b/tests/env/docker-compose.yaml
@@ -15,7 +15,9 @@ services:
- "6378:6379"
beanstalk:
- image: schickling/beanstalkd
+ build:
+ context: .
+ dockerfile: Dockerfile-beanstalkd.yaml
ports:
- "11300:11300"
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
index e3c0b017..8213d72a 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -43,47 +43,47 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
- test-local:
- driver: ephemeral
- priority: 10
- pipeline_size: 10000
-
- test-local-2:
- driver: ephemeral
- priority: 1
- pipeline_size: 10000
-
- test-local-3:
- driver: ephemeral
- priority: 2
- pipeline_size: 10000
-
- test-1:
- driver: amqp
- priority: 1
- pipeline_size: 1000000
- queue: test-1-queue
- exchange: default
- exchange_type: direct
- routing_key: test
-
- test-4:
- driver: amqp
- priority: 1
- pipeline_size: 1000000
- queue: test-1-queue
- exchange: default
- exchange_type: direct
- routing_key: test
-
- test-2-amqp:
- driver: amqp
- priority: 2
- pipeline_size: 100000
- queue: test-2-queue
- exchange: default
- exchange_type: direct
- routing_key: test-2
+# test-local:
+# driver: ephemeral
+# priority: 10
+# pipeline_size: 10000
+#
+# test-local-2:
+# driver: ephemeral
+# priority: 1
+# pipeline_size: 10000
+#
+# test-local-3:
+# driver: ephemeral
+# priority: 2
+# pipeline_size: 10000
+#
+# test-1:
+# driver: amqp
+# priority: 1
+# pipeline_size: 1000000
+# queue: test-1-queue
+# exchange: default
+# exchange_type: direct
+# routing_key: test
+#
+# test-4:
+# driver: amqp
+# priority: 1
+# pipeline_size: 1000000
+# queue: test-1-queue
+# exchange: default
+# exchange_type: direct
+# routing_key: test
+#
+# test-2-amqp:
+# driver: amqp
+# priority: 2
+# pipeline_size: 100000
+# queue: test-2-queue
+# exchange: default
+# exchange_type: direct
+# routing_key: test-2
test-2:
driver: beanstalk
@@ -92,15 +92,15 @@ jobs:
pipeline_size: 1000000
reserve_timeout: 10s
- test-3:
- driver: sqs
- pipeline_size: 1000000
- queue: default
- attributes:
- MessageRetentionPeriod: 86400
- tags:
- test: "tag"
+# test-3:
+# driver: sqs
+# pipeline_size: 1000000
+# queue: default
+# attributes:
+# MessageRetentionPeriod: 86400
+# tags:
+# test: "tag"
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3", "test-2" ]
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 59f55f3d..f9f19e0b 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -17,6 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk"
"github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -49,6 +50,7 @@ func TestTEMP_INTI(t *testing.T) {
&ephemeral.Plugin{},
&sqs.Plugin{},
&amqp.Plugin{},
+ &beanstalk.Plugin{},
)
assert.NoError(t, err)
@@ -97,7 +99,7 @@ func TestTEMP_INTI(t *testing.T) {
}
}()
- time.Sleep(time.Second * 3)
+ time.Sleep(time.Second * 3000)
stopCh <- struct{}{}
wg.Wait()
}