summaryrefslogtreecommitdiff
path: root/plugins/jobs/pipeline/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/pipeline/pipeline.go')
-rw-r--r--plugins/jobs/pipeline/pipeline.go148
1 files changed, 16 insertions, 132 deletions
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index 987f6826..e87204f9 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -1,106 +1,27 @@
package pipeline
-import (
- "time"
-
- "github.com/spiral/errors"
-)
-
-// Pipelines is list of Pipeline.
-
-type Pipelines []*Pipeline
-
-func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
- const op = errors.Op("pipeline_init")
- out := make(Pipelines, 0)
-
- for name, pipe := range pipes {
- if pipe.Driver() == "" {
- return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
- }
-
- p := pipe.With("name", name)
- out = append(out, &p)
- }
-
- return out, nil
-}
-
-// Reverse returns pipelines in reversed order.
-func (ps Pipelines) Reverse() Pipelines {
- out := make(Pipelines, len(ps))
-
- for i, p := range ps {
- out[len(ps)-i-1] = p
- }
-
- return out
-}
-
-// Broker return pipelines associated with specific broker.
-func (ps Pipelines) Broker(broker string) Pipelines {
- out := make(Pipelines, 0)
-
- for _, p := range ps {
- if p.Driver() != broker {
- continue
- }
-
- out = append(out, p)
- }
-
- return out
-}
-
-// Names returns only pipelines with specified names.
-func (ps Pipelines) Names(only ...string) Pipelines {
- out := make(Pipelines, 0)
-
- for _, name := range only {
- for _, p := range ps {
- if p.Name() == name {
- out = append(out, p)
- }
- }
- }
-
- return out
-}
-
-// Get returns pipeline by it'svc name.
-func (ps Pipelines) Get(name string) *Pipeline {
- // possibly optimize
- for _, p := range ps {
- if p.Name() == name {
- return p
- }
- }
-
- return nil
-}
-
// Pipeline defines pipeline options.
type Pipeline map[string]interface{}
-// With pipeline value. Immutable.
-func (p Pipeline) With(name string, value interface{}) Pipeline {
- out := make(map[string]interface{})
- for k, v := range p {
- out[k] = v
- }
- out[name] = value
+const (
+ priority string = "priority"
+ driver string = "driver"
+ name string = "name"
+)
- return out
+// With pipeline value
+func (p *Pipeline) With(name string, value interface{}) {
+ (*p)[name] = value
}
// Name returns pipeline name.
func (p Pipeline) Name() string {
- return p.String("name", "")
+ return p.String(name, "")
}
// Driver associated with the pipeline.
func (p Pipeline) Driver() string {
- return p.String("driver", "")
+ return p.String(driver, "")
}
// Has checks if value presented in pipeline.
@@ -112,32 +33,6 @@ func (p Pipeline) Has(name string) bool {
return false
}
-// Map must return nested map value or empty config.
-func (p Pipeline) Map(name string) Pipeline {
- out := make(map[string]interface{})
-
- if value, ok := p[name]; ok {
- if m, ok := value.(map[string]interface{}); ok {
- for k, v := range m {
- out[k] = v
- }
- }
- }
-
- return out
-}
-
-// Bool must return option value as string or return default value.
-func (p Pipeline) Bool(name string, d bool) bool {
- if value, ok := p[name]; ok {
- if b, ok := value.(bool); ok {
- return b
- }
- }
-
- return d
-}
-
// String must return option value as string or return default value.
func (p Pipeline) String(name string, d string) string {
if value, ok := p[name]; ok {
@@ -149,24 +44,13 @@ func (p Pipeline) String(name string, d string) string {
return d
}
-// Integer must return option value as string or return default value.
-func (p Pipeline) Integer(name string, d int) int {
- if value, ok := p[name]; ok {
- if str, ok := value.(int); ok {
- return str
+// Priority returns default pipeline priority
+func (p Pipeline) Priority() uint64 {
+ if value, ok := p[priority]; ok {
+ if v, ok := value.(uint64); ok {
+ return v
}
}
- return d
-}
-
-// Duration must return option value as time.Duration (seconds) or return default value.
-func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
- if value, ok := p[name]; ok {
- if str, ok := value.(int); ok {
- return time.Second * time.Duration(str)
- }
- }
-
- return d
+ return 10
}