diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
commit | 7ea227733e0b1fa59021233e6cd0fd06442fbe50 (patch) | |
tree | dcba3e2fddf53898d0f5c82eb128730ab38c723a | |
parent | 04fde6d8d1a5a88602f8206e0d2c09c4b8346941 (diff) |
Fix incorrect path in the CI. Implement FromPipeline for the sqs.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | .github/workflows/linux.yml | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 43 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 119 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 12 |
4 files changed, 172 insertions, 4 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 583d97f3..2b2809f9 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -77,7 +77,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/structs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 31999e23..20dcef2a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -43,7 +43,8 @@ type JobsConsumer struct { delayCache map[string]struct{} - stopCh chan struct{} + listeners uint32 + stopCh chan struct{} } // NewAMQPConsumer initializes rabbitmq pipeline @@ -336,6 +337,13 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { // run listener j.listener(deliv) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } @@ -345,6 +353,15 @@ func (j *JobsConsumer) Pause(p string) { j.log.Error("no such pipeline", "requested pause on: ", p) } + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + // protect connection (redial) j.Lock() defer j.Unlock() @@ -355,8 +372,17 @@ func (j *JobsConsumer) Pause(p string) { errCl := j.consumeChan.Close() if errCl != nil { j.log.Error("force close failed", "error", err) + return } + return } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobsConsumer) Resume(p string) { @@ -369,6 +395,13 @@ func (j *JobsConsumer) Resume(p string) { j.Lock() defer j.Unlock() + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + var err error j.consumeChan, err = j.conn.Channel() if err != nil { @@ -399,6 +432,13 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobsConsumer) Stop() error { @@ -410,7 +450,6 @@ func (j *JobsConsumer) Stop() error { Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), - Elapsed: 0, }) return nil } diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index cb7cb4af..7e2f229c 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -4,6 +4,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" @@ -127,7 +128,99 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure } func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - return &JobConsumer{}, nil + const op = errors.Op("new_sqs_consumer") + + const ( + attributes string = "attributes" + tags string = "tags" + queue string = "queue" + pref string = "prefetch" + visibility string = "visibility_timeout" + waitTime string = "wait_time" + ) + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + res := make(map[string]interface{}) + pipe.Map(attributes, res) + + attr := make(map[string]string) + // accept only string values + for i := range res { + if v, ok := res[i].(string); ok { + attr[i] = v + } + } + + // delete all values with map.clear to reuse for the tags + for k := range res { + delete(res, k) + } + + pipe.Map(tags, res) + + tg := make(map[string]string) + // accept only string values + for i := range res { + if v, ok := res[i].(string); ok { + attr[i] = v + } + } + + // initialize job consumer + jb := &JobConsumer{ + pq: pq, + log: log, + eh: e, + messageGroupID: uuid.NewString(), + attributes: attr, + tags: tg, + queue: pipe.String(queue, "default"), + prefetch: int32(pipe.Int(pref, 10)), + visibilityTimeout: int32(pipe.Int(visibility, 0)), + waitTime: int32(pipe.Int(waitTime, 0)), + region: globalCfg.Region, + key: globalCfg.Key, + sessionToken: globalCfg.SessionToken, + secret: globalCfg.Secret, + endpoint: globalCfg.Endpoint, + } + + // PARSE CONFIGURATION ------- + + awsConf, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(globalCfg.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) + if err != nil { + return nil, errors.E(op, err) + } + + // config with retries + jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + }) + }) + + jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + if err != nil { + return nil, errors.E(op, err) + } + + return jb, nil } func (j *JobConsumer) Push(jb *job.Job) error { @@ -184,6 +277,14 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { j.pauseCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) return nil } @@ -206,6 +307,13 @@ func (j *JobConsumer) Pause(p string) { // stop consume j.pauseCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobConsumer) Resume(p string) { @@ -225,5 +333,14 @@ func (j *JobConsumer) Resume(p string) { // start listener go j.listen() + + // increase num of listeners atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 90eeb189..2f4671d3 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -66,6 +66,18 @@ func (p Pipeline) Bool(name string, d bool) bool { return d } +// Map must return nested map value or empty config. +// Here might be sqs attributes or tags for example +func (p Pipeline) Map(name string, out map[string]interface{}) { + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } +} + // Priority returns default pipeline priority func (p Pipeline) Priority() int64 { if value, ok := p[priority]; ok { |