summaryrefslogtreecommitdiff
path: root/plugins/jobs/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/pipeline.go')
-rw-r--r--plugins/jobs/pipeline.go169
1 files changed, 169 insertions, 0 deletions
diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go
new file mode 100644
index 00000000..58b76e33
--- /dev/null
+++ b/plugins/jobs/pipeline.go
@@ -0,0 +1,169 @@
+package jobs
+
+import (
+ "fmt"
+ "time"
+)
+
+// Pipelines is list of Pipeline.
+type Pipelines []*Pipeline
+
+func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, fmt.Errorf("found the pipeline without defined broker")
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return Pipeline(out)
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return Pipeline(out)
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}