diff options
author | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
commit | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch) | |
tree | 8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/sqs | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) |
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 26 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 2 |
3 files changed, 15 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 17af1caa..23203190 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -24,7 +24,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -type JobConsumer struct { +type consumer struct { sync.Mutex pq priorityqueue.Queue log logger.Logger @@ -56,7 +56,7 @@ type JobConsumer struct { pauseCh chan struct{} } -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no such key - error @@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure globalCfg.InitDefault() // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure return jb, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no global section @@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf } // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered @@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ QueueUrl: j.queueURL, @@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") j.Lock() @@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { j.pauseCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { d, err := msg.pack(j.queueURL) if err != nil { return err diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index df72b2e5..996adf6c 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { +func (j *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 9efef90d..a4280af2 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -18,7 +18,7 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit +func (j *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { case <-j.pauseCh: |