summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 20:59:00 +0300
committerValery Piashchynski <[email protected]>2021-07-14 20:59:00 +0300
commit7ea227733e0b1fa59021233e6cd0fd06442fbe50 (patch)
treedcba3e2fddf53898d0f5c82eb128730ab38c723a
parent04fde6d8d1a5a88602f8206e0d2c09c4b8346941 (diff)
Fix incorrect path in the CI. Implement FromPipeline for the sqs.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--.github/workflows/linux.yml2
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go43
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go119
-rw-r--r--plugins/jobs/pipeline/pipeline.go12
4 files changed, 172 insertions, 4 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 583d97f3..2b2809f9 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -77,7 +77,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/structs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 31999e23..20dcef2a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -43,7 +43,8 @@ type JobsConsumer struct {
delayCache map[string]struct{}
- stopCh chan struct{}
+ listeners uint32
+ stopCh chan struct{}
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -336,6 +337,13 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
// run listener
j.listener(deliv)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
@@ -345,6 +353,15 @@ func (j *JobsConsumer) Pause(p string) {
j.log.Error("no such pipeline", "requested pause on: ", p)
}
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
// protect connection (redial)
j.Lock()
defer j.Unlock()
@@ -355,8 +372,17 @@ func (j *JobsConsumer) Pause(p string) {
errCl := j.consumeChan.Close()
if errCl != nil {
j.log.Error("force close failed", "error", err)
+ return
}
+ return
}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (j *JobsConsumer) Resume(p string) {
@@ -369,6 +395,13 @@ func (j *JobsConsumer) Resume(p string) {
j.Lock()
defer j.Unlock()
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {
@@ -399,6 +432,13 @@ func (j *JobsConsumer) Resume(p string) {
// run listener
j.listener(deliv)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (j *JobsConsumer) Stop() error {
@@ -410,7 +450,6 @@ func (j *JobsConsumer) Stop() error {
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
- Elapsed: 0,
})
return nil
}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index cb7cb4af..7e2f229c 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -4,6 +4,7 @@ import (
"context"
"sync"
"sync/atomic"
+ "time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
@@ -127,7 +128,99 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
}
func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- return &JobConsumer{}, nil
+ const op = errors.Op("new_sqs_consumer")
+
+ const (
+ attributes string = "attributes"
+ tags string = "tags"
+ queue string = "queue"
+ pref string = "prefetch"
+ visibility string = "visibility_timeout"
+ waitTime string = "wait_time"
+ )
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ res := make(map[string]interface{})
+ pipe.Map(attributes, res)
+
+ attr := make(map[string]string)
+ // accept only string values
+ for i := range res {
+ if v, ok := res[i].(string); ok {
+ attr[i] = v
+ }
+ }
+
+ // delete all values with map.clear to reuse for the tags
+ for k := range res {
+ delete(res, k)
+ }
+
+ pipe.Map(tags, res)
+
+ tg := make(map[string]string)
+ // accept only string values
+ for i := range res {
+ if v, ok := res[i].(string); ok {
+ attr[i] = v
+ }
+ }
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: attr,
+ tags: tg,
+ queue: pipe.String(queue, "default"),
+ prefetch: int32(pipe.Int(pref, 10)),
+ visibilityTimeout: int32(pipe.Int(visibility, 0)),
+ waitTime: int32(pipe.Int(waitTime, 0)),
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return jb, nil
}
func (j *JobConsumer) Push(jb *job.Job) error {
@@ -184,6 +277,14 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
func (j *JobConsumer) Stop() error {
j.pauseCh <- struct{}{}
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
@@ -206,6 +307,13 @@ func (j *JobConsumer) Pause(p string) {
// stop consume
j.pauseCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (j *JobConsumer) Resume(p string) {
@@ -225,5 +333,14 @@ func (j *JobConsumer) Resume(p string) {
// start listener
go j.listen()
+
+ // increase num of listeners
atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index 90eeb189..2f4671d3 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -66,6 +66,18 @@ func (p Pipeline) Bool(name string, d bool) bool {
return d
}
+// Map must return nested map value or empty config.
+// Here might be sqs attributes or tags for example
+func (p Pipeline) Map(name string, out map[string]interface{}) {
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+}
+
// Priority returns default pipeline priority
func (p Pipeline) Priority() int64 {
if value, ok := p[priority]; ok {