summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go51
1 files changed, 45 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index c0f66589..cb7cb4af 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -39,6 +39,9 @@ type JobConsumer struct {
prefetch int32
visibilityTimeout int32
+ // if user invoke several resume operations
+ listeners uint32
+
// queue optional parameters
attributes map[string]string
tags map[string]string
@@ -147,7 +150,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- _, err := j.client.SendMessage(context.Background(), j.pack(msg))
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
if err != nil {
return errors.E(op, err)
}
@@ -171,6 +174,8 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+ atomic.AddUint32(&j.listeners, 1)
+
// start listener
go j.listen()
@@ -178,13 +183,47 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Stop() error {
- panic("implement me")
+ j.pauseCh <- struct{}{}
+ return nil
}
-func (j *JobConsumer) Pause(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Pause(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop consume
+ j.pauseCh <- struct{}{}
}
-func (j *JobConsumer) Resume(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Resume(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+ atomic.AddUint32(&j.listeners, 1)
}