diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 51 |
1 files changed, 45 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index c0f66589..cb7cb4af 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -39,6 +39,9 @@ type JobConsumer struct { prefetch int32 visibilityTimeout int32 + // if user invoke several resume operations + listeners uint32 + // queue optional parameters attributes map[string]string tags map[string]string @@ -147,7 +150,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), j.pack(msg)) + _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl)) if err != nil { return errors.E(op, err) } @@ -171,6 +174,8 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } + atomic.AddUint32(&j.listeners, 1) + // start listener go j.listen() @@ -178,13 +183,47 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { } func (j *JobConsumer) Stop() error { - panic("implement me") + j.pauseCh <- struct{}{} + return nil } -func (j *JobConsumer) Pause(pipeline string) { - panic("implement me") +func (j *JobConsumer) Pause(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop consume + j.pauseCh <- struct{}{} } -func (j *JobConsumer) Resume(pipeline string) { - panic("implement me") +func (j *JobConsumer) Resume(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen() + atomic.AddUint32(&j.listeners, 1) } |