From 035e432af9a059e9e5187bd03f2e7864ed94c054 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 22 Jun 2021 17:33:55 +0300 Subject: - Folders struct - Initial ephemeral broker commit - Initial RPC Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 172 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 plugins/jobs/pipeline/pipeline.go (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go new file mode 100644 index 00000000..f27f6ede --- /dev/null +++ b/plugins/jobs/pipeline/pipeline.go @@ -0,0 +1,172 @@ +package pipeline + +import ( + "time" + + "github.com/spiral/errors" +) + +// Pipelines is list of Pipeline. + +type Pipelines []*Pipeline + +func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + const op = errors.Op("pipeline_init") + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return out +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return out +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} -- cgit v1.2.3 From 207739f7346c98e16087547bc510e1f909671260 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 5 Jul 2021 18:44:29 +0300 Subject: - Update PQ - Update ephemeral plugin, complete Push - Add Jobs full configuration Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index f27f6ede..987f6826 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -15,7 +15,7 @@ func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { out := make(Pipelines, 0) for name, pipe := range pipes { - if pipe.Broker() == "" { + if pipe.Driver() == "" { return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) } @@ -42,7 +42,7 @@ func (ps Pipelines) Broker(broker string) Pipelines { out := make(Pipelines, 0) for _, p := range ps { - if p.Broker() != broker { + if p.Driver() != broker { continue } @@ -98,9 +98,9 @@ func (p Pipeline) Name() string { return p.String("name", "") } -// Broker associated with the pipeline. -func (p Pipeline) Broker() string { - return p.String("broker", "") +// Driver associated with the pipeline. +func (p Pipeline) Driver() string { + return p.String("driver", "") } // Has checks if value presented in pipeline. -- cgit v1.2.3 From 60c229c8506df465586434309af5acd1f84e2406 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 7 Jul 2021 18:33:04 +0300 Subject: Updated ephemeral plugin, PQ and protobuf... Implement core of the root jobs plugin with a proper drivers/pipelines handling mechanism. Add delayed jobs for the ephemeral plugin. Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with a slice of the pipelines. Other small improvements. Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 148 +++++--------------------------------- 1 file changed, 16 insertions(+), 132 deletions(-) (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 987f6826..e87204f9 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -1,106 +1,27 @@ package pipeline -import ( - "time" - - "github.com/spiral/errors" -) - -// Pipelines is list of Pipeline. - -type Pipelines []*Pipeline - -func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { - const op = errors.Op("pipeline_init") - out := make(Pipelines, 0) - - for name, pipe := range pipes { - if pipe.Driver() == "" { - return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) - } - - p := pipe.With("name", name) - out = append(out, &p) - } - - return out, nil -} - -// Reverse returns pipelines in reversed order. -func (ps Pipelines) Reverse() Pipelines { - out := make(Pipelines, len(ps)) - - for i, p := range ps { - out[len(ps)-i-1] = p - } - - return out -} - -// Broker return pipelines associated with specific broker. -func (ps Pipelines) Broker(broker string) Pipelines { - out := make(Pipelines, 0) - - for _, p := range ps { - if p.Driver() != broker { - continue - } - - out = append(out, p) - } - - return out -} - -// Names returns only pipelines with specified names. -func (ps Pipelines) Names(only ...string) Pipelines { - out := make(Pipelines, 0) - - for _, name := range only { - for _, p := range ps { - if p.Name() == name { - out = append(out, p) - } - } - } - - return out -} - -// Get returns pipeline by it'svc name. -func (ps Pipelines) Get(name string) *Pipeline { - // possibly optimize - for _, p := range ps { - if p.Name() == name { - return p - } - } - - return nil -} - // Pipeline defines pipeline options. type Pipeline map[string]interface{} -// With pipeline value. Immutable. -func (p Pipeline) With(name string, value interface{}) Pipeline { - out := make(map[string]interface{}) - for k, v := range p { - out[k] = v - } - out[name] = value +const ( + priority string = "priority" + driver string = "driver" + name string = "name" +) - return out +// With pipeline value +func (p *Pipeline) With(name string, value interface{}) { + (*p)[name] = value } // Name returns pipeline name. func (p Pipeline) Name() string { - return p.String("name", "") + return p.String(name, "") } // Driver associated with the pipeline. func (p Pipeline) Driver() string { - return p.String("driver", "") + return p.String(driver, "") } // Has checks if value presented in pipeline. @@ -112,32 +33,6 @@ func (p Pipeline) Has(name string) bool { return false } -// Map must return nested map value or empty config. -func (p Pipeline) Map(name string) Pipeline { - out := make(map[string]interface{}) - - if value, ok := p[name]; ok { - if m, ok := value.(map[string]interface{}); ok { - for k, v := range m { - out[k] = v - } - } - } - - return out -} - -// Bool must return option value as string or return default value. -func (p Pipeline) Bool(name string, d bool) bool { - if value, ok := p[name]; ok { - if b, ok := value.(bool); ok { - return b - } - } - - return d -} - // String must return option value as string or return default value. func (p Pipeline) String(name string, d string) string { if value, ok := p[name]; ok { @@ -149,24 +44,13 @@ func (p Pipeline) String(name string, d string) string { return d } -// Integer must return option value as string or return default value. -func (p Pipeline) Integer(name string, d int) int { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return str +// Priority returns default pipeline priority +func (p Pipeline) Priority() uint64 { + if value, ok := p[priority]; ok { + if v, ok := value.(uint64); ok { + return v } } - return d -} - -// Duration must return option value as time.Duration (seconds) or return default value. -func (p Pipeline) Duration(name string, d time.Duration) time.Duration { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return time.Second * time.Duration(str) - } - } - - return d + return 10 } -- cgit v1.2.3 From e82e9248bb1afd5e571f465ac79ac7f5f79b81f1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 12 Jul 2021 08:47:33 +0300 Subject: Finish dynamic declaration of the pipelines. Fix issue with configuration parsing in the AMQP consumer. Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index e87204f9..91898178 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -44,6 +44,17 @@ func (p Pipeline) String(name string, d string) string { return d } +// Int must return option value as string or return default value. +func (p Pipeline) Int(name string, d int) int { + if value, ok := p[name]; ok { + if i, ok := value.(int); ok { + return i + } + } + + return d +} + // Priority returns default pipeline priority func (p Pipeline) Priority() uint64 { if value, ok := p[priority]; ok { -- cgit v1.2.3 From d099e47ab28dd044d34e18347a4c714b8af3d612 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 11:35:12 +0300 Subject: SQS driver. Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 91898178..90eeb189 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -55,10 +55,21 @@ func (p Pipeline) Int(name string, d int) int { return d } +// Bool must return option value as bool or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if i, ok := value.(bool); ok { + return i + } + } + + return d +} + // Priority returns default pipeline priority -func (p Pipeline) Priority() uint64 { +func (p Pipeline) Priority() int64 { if value, ok := p[priority]; ok { - if v, ok := value.(uint64); ok { + if v, ok := value.(int64); ok { return v } } -- cgit v1.2.3 From 7ea227733e0b1fa59021233e6cd0fd06442fbe50 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 20:59:00 +0300 Subject: Fix incorrect path in the CI. Implement FromPipeline for the sqs. Signed-off-by: Valery Piashchynski --- plugins/jobs/pipeline/pipeline.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'plugins/jobs/pipeline/pipeline.go') diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 90eeb189..2f4671d3 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -66,6 +66,18 @@ func (p Pipeline) Bool(name string, d bool) bool { return d } +// Map must return nested map value or empty config. +// Here might be sqs attributes or tags for example +func (p Pipeline) Map(name string, out map[string]interface{}) { + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } +} + // Priority returns default pipeline priority func (p Pipeline) Priority() int64 { if value, ok := p[priority]; ok { -- cgit v1.2.3