diff options
author | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
commit | b2da831f47284974551710d2767a7bdde0efa51d (patch) | |
tree | 7d8fee59cdb307110d2fcd872635437e0203321b | |
parent | 50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff) |
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation.
Complete redia for the beanstalk.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 16 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 100 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 77 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 56 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 34 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 43 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 41 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 23 | ||||
-rw-r--r-- | tests/env/Dockerfile-beanstalkd.yaml | 12 | ||||
-rw-r--r-- | tests/env/docker-compose.yaml | 4 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-test.yaml | 100 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 4 |
19 files changed, 425 insertions, 142 deletions
@@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.4.1 github.com/aws/aws-sdk-go-v2/credentials v1.3.0 github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0 + github.com/aws/smithy-go v1.5.0 github.com/beanstalkd/go-beanstalk v0.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cenkalti/backoff/v4 v4.1.1 diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 7c300c88..6b912dde 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -82,7 +82,21 @@ func (j *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the amqp, amqp.Table used instead func (j *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (j *Item) Ack() error { diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index 4f04484e..532aadb4 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -38,7 +38,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) expb.MaxElapsedTime = j.retryTimeout - op := func() error { + operation := func() error { j.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) @@ -90,7 +90,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit return nil } - retryErr := backoff.Retry(op, expb) + retryErr := backoff.Retry(operation, expb) if retryErr != nil { j.Unlock() j.log.Error("backoff failed", "error", retryErr) diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index f05ee122..6a8bda1d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -30,7 +30,7 @@ func (c *Config) InitDefault() { } if c.ReserveTimeout == 0 { - c.ReserveTimeout = time.Second * 5 + c.ReserveTimeout = time.Second * 1 } if c.PipePriority == 0 { diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go new file mode 100644 index 00000000..fd7a3902 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -0,0 +1,100 @@ +package beanstalk + +import ( + "sync" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/errors" +) + +type ConnPool struct { + sync.RWMutex + conn *beanstalk.Conn + connT *beanstalk.Conn + ts *beanstalk.TubeSet + t *beanstalk.Tube + + network string + address string + tName string + tout time.Duration +} + +func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) { + connT, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + connTS, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + tube := beanstalk.NewTube(connT, tName) + ts := beanstalk.NewTubeSet(connTS, tName) + + return &ConnPool{ + network: network, + address: address, + tName: tName, + tout: tout, + conn: connTS, + connT: connT, + ts: ts, + t: tube, + }, nil +} + +func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { + cp.RLock() + defer cp.RUnlock() + return cp.t.Put(body, pri, delay, ttr) +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) { + cp.RLock() + defer cp.RUnlock() + return cp.ts.Reserve(reserveTimeout) +} + +func (cp *ConnPool) Delete(id uint64) error { + cp.RLock() + defer cp.RUnlock() + return cp.conn.Delete(id) +} + +func (cp *ConnPool) Redial() error { + const op = errors.Op("connection_pool_redial") + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.Lock() + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + cp.Unlock() + return nil +} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 27d453f4..cce85c99 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,11 +3,9 @@ package beanstalk import ( "bytes" "strings" - "sync" "sync/atomic" "time" - "github.com/beanstalkd/go-beanstalk" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -18,7 +16,6 @@ import ( ) type JobConsumer struct { - sync.Mutex log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -27,11 +24,9 @@ type JobConsumer struct { listeners uint32 // beanstalk + pool *ConnPool addr string network string - conn *beanstalk.Conn - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet reserveTimeout time.Duration reconnectCh chan struct{} tout time.Duration @@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout) + if err != nil { + return nil, errors.E(op, err) + } + // initialize job consumer jc := &JobConsumer{ pq: pq, log: log, eh: e, + pool: cPool, network: dsn[0], addr: dsn[1], tout: globalCfg.Timeout, - reserveTimeout: pipeCfg.ReserveTimeout, tName: pipeCfg.Tube, + reserveTimeout: pipeCfg.ReserveTimeout, tubePriority: pipeCfg.TubePriority, priority: pipeCfg.PipePriority, @@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}), } - jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout) - if err != nil { - return nil, err - } - - jc.tube = beanstalk.NewTube(jc.conn, jc.tName) - jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName) - // start redial listener go jc.redial() @@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu globalCfg.InitDefault() - // initialize job consumer - jc := &JobConsumer{ - pq: pq, - log: log, - eh: e, - tout: globalCfg.Timeout, - tName: pipe.String(tube, ""), - reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - stopCh: make(chan struct{}), - reconnectCh: make(chan struct{}), - } - // PARSE CONFIGURATION ------- dsn := strings.Split(globalCfg.Addr, "://") @@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout) if err != nil { - return nil, err + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipe.String(tube, "default"), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + tubePriority: uint32(pipe.Int(tube, 10)), + priority: pipe.Priority(), + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + reconnectCh: make(chan struct{}, 2), } // start redial listener @@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - // reconnect protection - j.Lock() - defer j.Unlock() - item := fromJob(jb) bb := new(bytes.Buffer) @@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - errD := j.conn.Delete(id) + // TODO check for the connection error + errD := j.pool.Delete(id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) } @@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - j.Lock() - defer j.Unlock() - go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 329d4c8d..2c2873c2 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -6,6 +6,7 @@ import ( "time" "github.com/beanstalkd/go-beanstalk" + json "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -77,7 +78,21 @@ func (i *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead func (i *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (i *Item) Ack() error { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 873930d5..33dd4fe5 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,35 +1,69 @@ package beanstalk -func (j *JobConsumer) listen() { +import ( + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" +) + +func (j *JobConsumer) listen() { //nolint:gocognit + const op = errors.Op("beanstalk_listen") for { select { case <-j.stopCh: j.log.Warn("beanstalk listener stopped") return - default: - // lock used here to prevent consume from the broken connection - j.Lock() - - id, body, err := j.tubeSet.Reserve(j.reserveTimeout) + id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + // reserve timeout + if connErr, ok := err.(beanstalk.ConnError); ok { + switch connErr.Err { + case beanstalk.ErrTimeout: + j.log.Warn("timeout expired", "warn", connErr.Error()) + continue + default: + j.log.Error("beanstalk connection error", "error", connErr.Error()) + + // backoff here + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + operation := func() error { + errR := j.pool.Redial() + if errR != nil { + return errors.E(op, errR) + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr) + return + } + continue + } + } j.log.Error("beanstalk reserve", "error", err) - j.Unlock() continue } item := &Item{} - err = unpack(id, body, j.conn, item) + err = unpack(id, body, j.pool.conn, item) if err != nil { j.log.Error("beanstalk unpack item", "error", err) - j.Unlock() continue } // insert job into the priority queue j.pq.Insert(item) - - j.Unlock() } } } diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go index e1922517..784337ad 100644 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -2,30 +2,40 @@ package beanstalk import ( "sync/atomic" + "time" - "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" ) func (j *JobConsumer) redial() { for range j.reconnectCh { // backoff here - - j.Lock() - - var err error - j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout) - if err != nil { - panic(err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + op := func() error { + err := j.pool.Redial() + if err != nil { + return err + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil } - j.tube = beanstalk.NewTube(j.conn, j.tName) - j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName) + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed", "error", retryErr) + continue + } // restart listener if atomic.LoadUint32(&j.listeners) == 1 { + // stop previous listener + j.stopCh <- struct{}{} go j.listen() } - - j.Unlock() } } diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index cd08ca7a..c49a23c1 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -166,7 +166,13 @@ func (j *JobBroker) Resume(pipeline string) { } // Run is no-op for the ephemeral -func (j *JobBroker) Run(_ *pipeline.Pipeline) error { +func (j *JobBroker) Run(pipe *pipeline.Pipeline) error { + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) return nil } diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go index 0b4e8157..af5b1cfb 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/jobs/drivers/sqs/config.go @@ -1,5 +1,7 @@ package sqs +import "github.com/aws/aws-sdk-go-v2/aws" + type GlobalCfg struct { Key string `mapstructure:"key"` Secret string `mapstructure:"secret"` @@ -36,7 +38,7 @@ type Config struct { // Queue URLs and names are case-sensitive. // // This member is required. - Queue string `mapstructure:"queue"` + Queue *string `mapstructure:"queue"` // A map of attributes with their corresponding values. The following lists the // names, descriptions, and values of the special request parameters that the @@ -81,8 +83,8 @@ func (c *GlobalCfg) InitDefault() { } func (c *Config) InitDefault() { - if c.Queue == "" { - c.Queue = "default" + if c.Queue == nil { + c.Queue = aws.String("default") } if c.PrefetchCount == 0 || c.PrefetchCount > 10 { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 7e1f6d56..18546715 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -34,7 +34,7 @@ type JobConsumer struct { sessionToken string region string endpoint string - queue string + queue *string messageGroupID string waitTime int32 prefetch int32 @@ -47,8 +47,8 @@ type JobConsumer struct { attributes map[string]string tags map[string]string - client *sqs.Client - outputQ *sqs.CreateQueueOutput + client *sqs.Client + queueURL *string pauseCh chan struct{} } @@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf messageGroupID: uuid.NewString(), attributes: attr, tags: tg, - queue: pipe.String(queue, "default"), + queue: aws.String(pipe.String(queue, "default")), prefetch: int32(pipe.Int(pref, 10)), visibilityTimeout: int32(pipe.Int(visibility, 0)), waitTime: int32(pipe.Int(waitTime, 0)), @@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl)) + _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL)) if err != nil { return errors.E(op, err) } @@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { // start listener go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 0f03cd20..9dd0aa5f 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + json "github.com/json-iterator/go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" @@ -108,7 +109,21 @@ func (i *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead func (i *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (i *Item) Ack() error { diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index ded79ae7..5722c19a 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -2,40 +2,71 @@ package sqs import ( "context" + "time" + "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/smithy-go" ) const ( + // All - get all message attribute names All string = "All" + + // NonExistentQueue AWS error code + NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen() { +func (j *JobConsumer) listen() { //nolint:gocognit for { select { case <-j.pauseCh: return default: message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, MaxNumberOfMessages: j.prefetch, AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, MessageAttributeNames: []string{All}, VisibilityTimeout: j.visibilityTimeout, WaitTimeSeconds: j.waitTime, }) + if err != nil { + if oErr, ok := (err).(*smithy.OperationError); ok { + if rErr, ok := oErr.Err.(*http.ResponseError); ok { + if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { + // in case of NonExistentQueue - recreate the queue + if apiErr.Code == NonExistentQueue { + j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + if err != nil { + j.log.Error("create queue", "error", err) + } + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + continue + } + } + } + } + j.log.Error("receive message", "error", err) continue } for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := unpack(&m, j.outputQ.QueueUrl, j.client) + item, err := unpack(&m, j.queueURL, j.client) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { @@ -52,7 +83,7 @@ func (j *JobConsumer) listen() { } _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 98e7ebf8..47d31d99 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -20,6 +20,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -194,11 +195,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return default: // get data JOB from the queue - job := p.queue.ExtractMin() + jb := p.queue.ExtractMin() - ctx, err := job.Context() + ctx, err := jb.Context() if err != nil { - errNack := job.Nack() + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } @@ -208,14 +209,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit exec := payload.Payload{ Context: ctx, - Body: job.Body(), + Body: jb.Body(), } // protect from the pool reset p.RLock() - _, err = p.workersPool.Exec(exec) + + // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + + resp, err := p.workersPool.Exec(exec) if err != nil { - errNack := job.Nack() + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } @@ -226,9 +231,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } p.RUnlock() - errAck := job.Ack() + // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) + + errAck := jb.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) + continue } // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) diff --git a/tests/env/Dockerfile-beanstalkd.yaml b/tests/env/Dockerfile-beanstalkd.yaml new file mode 100644 index 00000000..cb81aafa --- /dev/null +++ b/tests/env/Dockerfile-beanstalkd.yaml @@ -0,0 +1,12 @@ +FROM ubuntu:latest + +RUN apt-get update && apt-get install -y curl build-essential + +RUN curl -sL https://github.com/kr/beanstalkd/archive/v1.12.tar.gz | tar xvz -C /tmp + +WORKDIR /tmp/beanstalkd-1.12 +RUN make +RUN cp beanstalkd /usr/bin + +EXPOSE 11300 +ENTRYPOINT ["/usr/bin/beanstalkd"] diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml index a407fed4..67a927c4 100644 --- a/tests/env/docker-compose.yaml +++ b/tests/env/docker-compose.yaml @@ -15,7 +15,9 @@ services: - "6378:6379" beanstalk: - image: schickling/beanstalkd + build: + context: . + dockerfile: Dockerfile-beanstalkd.yaml ports: - "11300:11300" diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml index e3c0b017..8213d72a 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml @@ -43,47 +43,47 @@ jobs: # list of broker pipelines associated with endpoints pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-4: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 +# test-local: +# driver: ephemeral +# priority: 10 +# pipeline_size: 10000 +# +# test-local-2: +# driver: ephemeral +# priority: 1 +# pipeline_size: 10000 +# +# test-local-3: +# driver: ephemeral +# priority: 2 +# pipeline_size: 10000 +# +# test-1: +# driver: amqp +# priority: 1 +# pipeline_size: 1000000 +# queue: test-1-queue +# exchange: default +# exchange_type: direct +# routing_key: test +# +# test-4: +# driver: amqp +# priority: 1 +# pipeline_size: 1000000 +# queue: test-1-queue +# exchange: default +# exchange_type: direct +# routing_key: test +# +# test-2-amqp: +# driver: amqp +# priority: 2 +# pipeline_size: 100000 +# queue: test-2-queue +# exchange: default +# exchange_type: direct +# routing_key: test-2 test-2: driver: beanstalk @@ -92,15 +92,15 @@ jobs: pipeline_size: 1000000 reserve_timeout: 10s - test-3: - driver: sqs - pipeline_size: 1000000 - queue: default - attributes: - MessageRetentionPeriod: 86400 - tags: - test: "tag" +# test-3: +# driver: sqs +# pipeline_size: 1000000 +# queue: default +# attributes: +# MessageRetentionPeriod: 86400 +# tags: +# test: "tag" # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ] + consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3", "test-2" ] diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index 59f55f3d..f9f19e0b 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -17,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -49,6 +50,7 @@ func TestTEMP_INTI(t *testing.T) { &ephemeral.Plugin{}, &sqs.Plugin{}, &amqp.Plugin{}, + &beanstalk.Plugin{}, ) assert.NoError(t, err) @@ -97,7 +99,7 @@ func TestTEMP_INTI(t *testing.T) { } }() - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 3000) stopCh <- struct{}{} wg.Wait() } |