diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 14:55:24 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 14:55:24 +0300 |
commit | cd07985494b3ebb03fd6553bed9aa1393052ffc5 (patch) | |
tree | ea2c89de3fb7765af84ff9dac7c83881bc8869d2 /plugins | |
parent | d099e47ab28dd044d34e18347a4c714b8af3d612 (diff) |
Implement Pause/Resume/Stop
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 51 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 73 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 2 | ||||
-rw-r--r-- | plugins/jobs/job/job_options_test.go | 16 |
4 files changed, 108 insertions, 34 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index c0f66589..cb7cb4af 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -39,6 +39,9 @@ type JobConsumer struct { prefetch int32 visibilityTimeout int32 + // if user invoke several resume operations + listeners uint32 + // queue optional parameters attributes map[string]string tags map[string]string @@ -147,7 +150,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), j.pack(msg)) + _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl)) if err != nil { return errors.E(op, err) } @@ -171,6 +174,8 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } + atomic.AddUint32(&j.listeners, 1) + // start listener go j.listen() @@ -178,13 +183,47 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { } func (j *JobConsumer) Stop() error { - panic("implement me") + j.pauseCh <- struct{}{} + return nil } -func (j *JobConsumer) Pause(pipeline string) { - panic("implement me") +func (j *JobConsumer) Pause(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop consume + j.pauseCh <- struct{}{} } -func (j *JobConsumer) Resume(pipeline string) { - panic("implement me") +func (j *JobConsumer) Resume(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen() + atomic.AddUint32(&j.listeners, 1) } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index ef736be9..815b68c6 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -1,6 +1,7 @@ package sqs import ( + "context" "strconv" "time" @@ -60,6 +61,12 @@ type Options struct { // Maximum number of attempts to receive and process the message MaxAttempts int64 `json:"max_attempts,omitempty"` + + // Private ================ + approxReceiveCount int64 + queue *string + receiptHandler *string + client *sqs.Client } // CanRetry must return true if broker is allowed to re-run the job. @@ -82,30 +89,52 @@ func (o *Options) TimeoutDuration() time.Duration { return time.Second * time.Duration(o.Timeout) } -func (j *Item) ID() string { - return j.Ident +func (i *Item) ID() string { + return i.Ident } -func (j *Item) Priority() int64 { - return j.Options.Priority +func (i *Item) Priority() int64 { + return i.Options.Priority } // Body packs job payload into binary payload. -func (j *Item) Body() []byte { - return utils.AsBytes(j.Payload) +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) } // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead -func (j *Item) Context() ([]byte, error) { +func (i *Item) Context() ([]byte, error) { return nil, nil } -func (j *Item) Ack() error { +func (i *Item) Ack() error { + _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + return nil } -func (j *Item) Nack() error { +func (i *Item) Nack() error { + if i.Options.CanRetry(i.Options.approxReceiveCount) { + return nil + } + + _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + return nil } @@ -124,22 +153,22 @@ func fromJob(job *job.Job) *Item { } } -func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput { +func (i *Item) pack(queue *string) *sqs.SendMessageInput { return &sqs.SendMessageInput{ - MessageBody: aws.String(item.Payload), - QueueUrl: j.outputQ.QueueUrl, - DelaySeconds: int32(item.Options.Delay), + MessageBody: aws.String(i.Payload), + QueueUrl: queue, + DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))}, - job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))}, + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, + job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))}, }, } } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) { +func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -185,6 +214,12 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) { Timeout: int64(to), Priority: int64(priority), MaxAttempts: int64(attempt), + + // private + approxReceiveCount: int64(recCount), + client: client, + queue: queue, + receiptHandler: msg.ReceiptHandle, }, } diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index a10ce5a6..bb6f8c7a 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -32,7 +32,7 @@ func (j *JobConsumer) listen() { for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, attempt, err := j.unpack(&m) + item, attempt, err := unpack(&m, j.outputQ.QueueUrl, j.client) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.outputQ.QueueUrl, diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go index 500d8006..f4b1dc0c 100644 --- a/plugins/jobs/job/job_options_test.go +++ b/plugins/jobs/job/job_options_test.go @@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) { }) assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, uint64(1), opts.Attempts) - assert.Equal(t, uint64(2), opts.Delay) - assert.Equal(t, uint64(1), opts.Timeout) - assert.Equal(t, uint64(1), opts.RetryDelay) + assert.Equal(t, int64(1), opts.Attempts) + assert.Equal(t, int64(2), opts.Delay) + assert.Equal(t, int64(1), opts.Timeout) + assert.Equal(t, int64(1), opts.RetryDelay) } func TestOptions_MergeKeepOriginal(t *testing.T) { @@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { }) assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, uint64(10), opts.Attempts) - assert.Equal(t, uint64(10), opts.Delay) - assert.Equal(t, uint64(10), opts.Timeout) - assert.Equal(t, uint64(10), opts.RetryDelay) + assert.Equal(t, int64(10), opts.Attempts) + assert.Equal(t, int64(10), opts.Delay) + assert.Equal(t, int64(10), opts.Timeout) + assert.Equal(t, int64(10), opts.RetryDelay) } |