summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 14:55:24 +0300
committerValery Piashchynski <[email protected]>2021-07-14 14:55:24 +0300
commitcd07985494b3ebb03fd6553bed9aa1393052ffc5 (patch)
treeea2c89de3fb7765af84ff9dac7c83881bc8869d2 /plugins
parentd099e47ab28dd044d34e18347a4c714b8af3d612 (diff)
Implement Pause/Resume/Stop
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go51
-rw-r--r--plugins/jobs/drivers/sqs/item.go73
-rw-r--r--plugins/jobs/drivers/sqs/listener.go2
-rw-r--r--plugins/jobs/job/job_options_test.go16
4 files changed, 108 insertions, 34 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index c0f66589..cb7cb4af 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -39,6 +39,9 @@ type JobConsumer struct {
prefetch int32
visibilityTimeout int32
+ // if user invoke several resume operations
+ listeners uint32
+
// queue optional parameters
attributes map[string]string
tags map[string]string
@@ -147,7 +150,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- _, err := j.client.SendMessage(context.Background(), j.pack(msg))
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
if err != nil {
return errors.E(op, err)
}
@@ -171,6 +174,8 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+ atomic.AddUint32(&j.listeners, 1)
+
// start listener
go j.listen()
@@ -178,13 +183,47 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Stop() error {
- panic("implement me")
+ j.pauseCh <- struct{}{}
+ return nil
}
-func (j *JobConsumer) Pause(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Pause(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop consume
+ j.pauseCh <- struct{}{}
}
-func (j *JobConsumer) Resume(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Resume(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+ atomic.AddUint32(&j.listeners, 1)
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index ef736be9..815b68c6 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -1,6 +1,7 @@
package sqs
import (
+ "context"
"strconv"
"time"
@@ -60,6 +61,12 @@ type Options struct {
// Maximum number of attempts to receive and process the message
MaxAttempts int64 `json:"max_attempts,omitempty"`
+
+ // Private ================
+ approxReceiveCount int64
+ queue *string
+ receiptHandler *string
+ client *sqs.Client
}
// CanRetry must return true if broker is allowed to re-run the job.
@@ -82,30 +89,52 @@ func (o *Options) TimeoutDuration() time.Duration {
return time.Second * time.Duration(o.Timeout)
}
-func (j *Item) ID() string {
- return j.Ident
+func (i *Item) ID() string {
+ return i.Ident
}
-func (j *Item) Priority() int64 {
- return j.Options.Priority
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
}
// Body packs job payload into binary payload.
-func (j *Item) Body() []byte {
- return utils.AsBytes(j.Payload)
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
}
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
-func (j *Item) Context() ([]byte, error) {
+func (i *Item) Context() ([]byte, error) {
return nil, nil
}
-func (j *Item) Ack() error {
+func (i *Item) Ack() error {
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
return nil
}
-func (j *Item) Nack() error {
+func (i *Item) Nack() error {
+ if i.Options.CanRetry(i.Options.approxReceiveCount) {
+ return nil
+ }
+
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
return nil
}
@@ -124,22 +153,22 @@ func fromJob(job *job.Job) *Item {
}
}
-func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput {
+func (i *Item) pack(queue *string) *sqs.SendMessageInput {
return &sqs.SendMessageInput{
- MessageBody: aws.String(item.Payload),
- QueueUrl: j.outputQ.QueueUrl,
- DelaySeconds: int32(item.Options.Delay),
+ MessageBody: aws.String(i.Payload),
+ QueueUrl: queue,
+ DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))},
- job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))},
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
},
}
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
+func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -185,6 +214,12 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
Timeout: int64(to),
Priority: int64(priority),
MaxAttempts: int64(attempt),
+
+ // private
+ approxReceiveCount: int64(recCount),
+ client: client,
+ queue: queue,
+ receiptHandler: msg.ReceiptHandle,
},
}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index a10ce5a6..bb6f8c7a 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -32,7 +32,7 @@ func (j *JobConsumer) listen() {
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, attempt, err := j.unpack(&m)
+ item, attempt, err := unpack(&m, j.outputQ.QueueUrl, j.client)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.outputQ.QueueUrl,
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go
index 500d8006..f4b1dc0c 100644
--- a/plugins/jobs/job/job_options_test.go
+++ b/plugins/jobs/job/job_options_test.go
@@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) {
})
assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, uint64(1), opts.Attempts)
- assert.Equal(t, uint64(2), opts.Delay)
- assert.Equal(t, uint64(1), opts.Timeout)
- assert.Equal(t, uint64(1), opts.RetryDelay)
+ assert.Equal(t, int64(1), opts.Attempts)
+ assert.Equal(t, int64(2), opts.Delay)
+ assert.Equal(t, int64(1), opts.Timeout)
+ assert.Equal(t, int64(1), opts.RetryDelay)
}
func TestOptions_MergeKeepOriginal(t *testing.T) {
@@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) {
})
assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, uint64(10), opts.Attempts)
- assert.Equal(t, uint64(10), opts.Delay)
- assert.Equal(t, uint64(10), opts.Timeout)
- assert.Equal(t, uint64(10), opts.RetryDelay)
+ assert.Equal(t, int64(10), opts.Attempts)
+ assert.Equal(t, int64(10), opts.Delay)
+ assert.Equal(t, int64(10), opts.Timeout)
+ assert.Equal(t, int64(10), opts.RetryDelay)
}