summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 19:04:16 +0300
committerValery Piashchynski <[email protected]>2021-09-02 19:04:16 +0300
commitda78c9ed8ee7406aa2de21bc70642928c391852a (patch)
tree34e755cd1f01d9573486cb6cbb386ce43fbd727c /plugins
parentc64005501f92888c10a61481745df91c7c50639f (diff)
Do not use copied driver from the map.
Store driver and pipeline only after run w/o error. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/plugin.go19
-rw-r--r--plugins/jobs/rpc.go6
2 files changed, 12 insertions, 13 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index eb273b93..3aec6acc 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -129,12 +129,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// config key for the particular sub-driver jobs.pipelines.test-local
configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
// init the driver
- initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue)
if err != nil {
errCh <- errors.E(op, err)
return false
@@ -595,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// init the driver from pipeline
- initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue)
if err != nil {
return errors.E(op, err)
}
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers.Store(pipeline.Name(), initializedDriver)
-
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipeline)
if err != nil {
@@ -621,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
return errors.E(op, err)
}
}
- }
- // save the pipeline
- p.pipelines.Store(pipeline.Name(), pipeline)
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers.Store(pipeline.Name(), initializedDriver)
+ // save the pipeline
+ p.pipelines.Store(pipeline.Name(), pipeline)
+ }
return nil
}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 62186981..d7b93bd1 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
return errors.E(op, errors.Str("empty ID field not allowed"))
}
- err := r.p.Push(r.from(j.GetJob()))
+ err := r.p.Push(from(j.GetJob()))
if err != nil {
return errors.E(op, err)
}
@@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
for i := 0; i < l; i++ {
// convert transport entity into domain
// how we can do this quickly
- batch[i] = r.from(j.GetJobs()[i])
+ batch[i] = from(j.GetJobs()[i])
}
err := r.p.PushBatch(batch)
@@ -137,7 +137,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
+func from(j *jobsv1beta.Job) *job.Job {
headers := make(map[string][]string, len(j.GetHeaders()))
for k, v := range j.GetHeaders() {