diff options
author | Valery Piashchynski <[email protected]> | 2021-08-20 14:35:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-20 14:35:03 +0300 |
commit | fe0d1480386378d7102465abded0c4e698c5023b (patch) | |
tree | 304fa88ce4e92a0af0524af522fa7a294203d89b | |
parent | 7c155577ee9f7a4c19bd8aa04d7b44f908f0b5e7 (diff) | |
parent | d7d753a625764df61adc56f778e6b9fa846851c5 (diff) |
#769: fix(jobs, sqs): fix sqs `attributes` and `tags` parser
#769: fix(jobs, sqs): fix sqs `attributes` and `tags` parser
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 26 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 16 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 1 |
3 files changed, 19 insertions, 24 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 5d419b51..17af1caa 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -160,30 +160,16 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf globalCfg.InitDefault() - res := make(map[string]interface{}) - pipe.Map(attributes, res) - attr := make(map[string]string) - // accept only string values - for i := range res { - if v, ok := res[i].(string); ok { - attr[i] = v - } - } - - // delete all values with map.clear to reuse for the tags - for k := range res { - delete(res, k) + err = pipe.Map(attributes, attr) + if err != nil { + return nil, errors.E(op, err) } - pipe.Map(tags, res) - tg := make(map[string]string) - // accept only string values - for i := range res { - if v, ok := res[i].(string); ok { - attr[i] = v - } + err = pipe.Map(tags, tg) + if err != nil { + return nil, errors.E(op, err) } // initialize job consumer diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 2f4671d3..8a8c1462 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -1,5 +1,10 @@ package pipeline +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + // Pipeline defines pipeline options. type Pipeline map[string]interface{} @@ -68,14 +73,17 @@ func (p Pipeline) Bool(name string, d bool) bool { // Map must return nested map value or empty config. // Here might be sqs attributes or tags for example -func (p Pipeline) Map(name string, out map[string]interface{}) { +func (p Pipeline) Map(name string, out map[string]string) error { if value, ok := p[name]; ok { - if m, ok := value.(map[string]interface{}); ok { - for k, v := range m { - out[k] = v + if m, ok := value.(string); ok { + err := json.Unmarshal(utils.AsBytes(m), &out) + if err != nil { + return err } } } + + return nil } // Priority returns default pipeline priority diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index ae15eb89..630a059a 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -494,6 +494,7 @@ func declareSQSPipe(t *testing.T) { "priority": "3", "visibility_timeout": "0", "wait_time_seconds": "3", + "tags": `{"key":"value"}`, }} er := &jobsv1beta.Empty{} |