summaryrefslogtreecommitdiff
path: root/plugins/sqs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
committerValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
commitc7d9385f135853539100430521042f7e7e2ae005 (patch)
tree588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins/sqs/consumer.go
parentf6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff)
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/sqs/consumer.go')
-rw-r--r--plugins/sqs/consumer.go84
1 files changed, 43 insertions, 41 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..dfbda154 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != jb.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
@@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
+ attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: c.queueURL,
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeNameApproximateNumberOfMessages,
types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
@@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
out := &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Queue: *c.queueURL,
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.pauseCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error {
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop consume
- j.pauseCh <- struct{}{}
+ c.pauseCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ c.log.Warn("sqs listener already in the active state")
return
}
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
// increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(c.queueURL)
if err != nil {
return err
}
- _, err = j.client.SendMessage(ctx, d)
+ _, err = c.client.SendMessage(ctx, d)
if err != nil {
return err
}