summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/sqs
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go26
-rw-r--r--plugins/jobs/drivers/sqs/item.go2
-rw-r--r--plugins/jobs/drivers/sqs/listener.go2
3 files changed, 15 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 17af1caa..23203190 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -24,7 +24,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-type JobConsumer struct {
+type consumer struct {
sync.Mutex
pq priorityqueue.Queue
log logger.Logger
@@ -56,7 +56,7 @@ type JobConsumer struct {
pauseCh chan struct{}
}
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no such key - error
@@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
globalCfg.InitDefault()
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
return jb, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no global section
@@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
}
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
@@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
QueueUrl: j.queueURL,
@@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
j.Lock()
@@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.pauseCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
d, err := msg.pack(j.queueURL)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index df72b2e5..996adf6c 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
+func (j *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 9efef90d..a4280af2 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -18,7 +18,7 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
+func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
case <-j.pauseCh: