diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
commit | 7ea227733e0b1fa59021233e6cd0fd06442fbe50 (patch) | |
tree | dcba3e2fddf53898d0f5c82eb128730ab38c723a /plugins/jobs/drivers/sqs | |
parent | 04fde6d8d1a5a88602f8206e0d2c09c4b8346941 (diff) |
Fix incorrect path in the CI. Implement FromPipeline for the sqs.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 119 |
1 files changed, 118 insertions, 1 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index cb7cb4af..7e2f229c 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -4,6 +4,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" @@ -127,7 +128,99 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure } func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - return &JobConsumer{}, nil + const op = errors.Op("new_sqs_consumer") + + const ( + attributes string = "attributes" + tags string = "tags" + queue string = "queue" + pref string = "prefetch" + visibility string = "visibility_timeout" + waitTime string = "wait_time" + ) + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + res := make(map[string]interface{}) + pipe.Map(attributes, res) + + attr := make(map[string]string) + // accept only string values + for i := range res { + if v, ok := res[i].(string); ok { + attr[i] = v + } + } + + // delete all values with map.clear to reuse for the tags + for k := range res { + delete(res, k) + } + + pipe.Map(tags, res) + + tg := make(map[string]string) + // accept only string values + for i := range res { + if v, ok := res[i].(string); ok { + attr[i] = v + } + } + + // initialize job consumer + jb := &JobConsumer{ + pq: pq, + log: log, + eh: e, + messageGroupID: uuid.NewString(), + attributes: attr, + tags: tg, + queue: pipe.String(queue, "default"), + prefetch: int32(pipe.Int(pref, 10)), + visibilityTimeout: int32(pipe.Int(visibility, 0)), + waitTime: int32(pipe.Int(waitTime, 0)), + region: globalCfg.Region, + key: globalCfg.Key, + sessionToken: globalCfg.SessionToken, + secret: globalCfg.Secret, + endpoint: globalCfg.Endpoint, + } + + // PARSE CONFIGURATION ------- + + awsConf, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(globalCfg.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) + if err != nil { + return nil, errors.E(op, err) + } + + // config with retries + jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + }) + }) + + jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + if err != nil { + return nil, errors.E(op, err) + } + + return jb, nil } func (j *JobConsumer) Push(jb *job.Job) error { @@ -184,6 +277,14 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { j.pauseCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) return nil } @@ -206,6 +307,13 @@ func (j *JobConsumer) Pause(p string) { // stop consume j.pauseCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobConsumer) Resume(p string) { @@ -225,5 +333,14 @@ func (j *JobConsumer) Resume(p string) { // start listener go j.listen() + + // increase num of listeners atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } |