diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 19:04:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-02 19:04:16 +0300 |
commit | da78c9ed8ee7406aa2de21bc70642928c391852a (patch) | |
tree | 34e755cd1f01d9573486cb6cbb386ce43fbd727c /plugins/jobs | |
parent | c64005501f92888c10a61481745df91c7c50639f (diff) |
Do not use copied driver from the map.
Store driver and pipeline only after run w/o error.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/plugin.go | 19 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 6 |
2 files changed, 12 insertions, 13 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index eb273b93..3aec6acc 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -129,12 +129,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // config key for the particular sub-driver jobs.pipelines.test-local configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false @@ -595,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(pipeline.Name(), initializedDriver) - // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipeline) if err != nil { @@ -621,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { return errors.E(op, err) } } - } - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers.Store(pipeline.Name(), initializedDriver) + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + } return nil } diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 62186981..d7b93bd1 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { return errors.E(op, errors.Str("empty ID field not allowed")) } - err := r.p.Push(r.from(j.GetJob())) + err := r.p.Push(from(j.GetJob())) if err != nil { return errors.E(op, err) } @@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - batch[i] = r.from(j.GetJobs()[i]) + batch[i] = from(j.GetJobs()[i]) } err := r.p.PushBatch(batch) @@ -137,7 +137,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { } // from converts from transport entity to domain -func (r *rpc) from(j *jobsv1beta.Job) *job.Job { +func from(j *jobsv1beta.Job) *job.Job { headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { |