summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-20 14:35:03 +0300
committerGitHub <[email protected]>2021-08-20 14:35:03 +0300
commitfe0d1480386378d7102465abded0c4e698c5023b (patch)
tree304fa88ce4e92a0af0524af522fa7a294203d89b
parent7c155577ee9f7a4c19bd8aa04d7b44f908f0b5e7 (diff)
parentd7d753a625764df61adc56f778e6b9fa846851c5 (diff)
#769: fix(jobs, sqs): fix sqs `attributes` and `tags` parser
#769: fix(jobs, sqs): fix sqs `attributes` and `tags` parser
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go26
-rw-r--r--plugins/jobs/pipeline/pipeline.go16
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go1
3 files changed, 19 insertions, 24 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 5d419b51..17af1caa 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -160,30 +160,16 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
globalCfg.InitDefault()
- res := make(map[string]interface{})
- pipe.Map(attributes, res)
-
attr := make(map[string]string)
- // accept only string values
- for i := range res {
- if v, ok := res[i].(string); ok {
- attr[i] = v
- }
- }
-
- // delete all values with map.clear to reuse for the tags
- for k := range res {
- delete(res, k)
+ err = pipe.Map(attributes, attr)
+ if err != nil {
+ return nil, errors.E(op, err)
}
- pipe.Map(tags, res)
-
tg := make(map[string]string)
- // accept only string values
- for i := range res {
- if v, ok := res[i].(string); ok {
- attr[i] = v
- }
+ err = pipe.Map(tags, tg)
+ if err != nil {
+ return nil, errors.E(op, err)
}
// initialize job consumer
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index 2f4671d3..8a8c1462 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -1,5 +1,10 @@
package pipeline
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
// Pipeline defines pipeline options.
type Pipeline map[string]interface{}
@@ -68,14 +73,17 @@ func (p Pipeline) Bool(name string, d bool) bool {
// Map must return nested map value or empty config.
// Here might be sqs attributes or tags for example
-func (p Pipeline) Map(name string, out map[string]interface{}) {
+func (p Pipeline) Map(name string, out map[string]string) error {
if value, ok := p[name]; ok {
- if m, ok := value.(map[string]interface{}); ok {
- for k, v := range m {
- out[k] = v
+ if m, ok := value.(string); ok {
+ err := json.Unmarshal(utils.AsBytes(m), &out)
+ if err != nil {
+ return err
}
}
}
+
+ return nil
}
// Priority returns default pipeline priority
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index ae15eb89..630a059a 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -494,6 +494,7 @@ func declareSQSPipe(t *testing.T) {
"priority": "3",
"visibility_timeout": "0",
"wait_time_seconds": "3",
+ "tags": `{"key":"value"}`,
}}
er := &jobsv1beta.Empty{}