diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 96 |
1 files changed, 55 insertions, 41 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..3aec6acc 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -41,7 +41,7 @@ type Plugin struct { server server.Server jobConstructors map[string]jobs.Constructor - consumers map[string]jobs.Consumer + consumers sync.Map // map[string]jobs.Consumer // events handler events events.Handler @@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events.AddListener(p.collectJobsEvents) p.jobConstructors = make(map[string]jobs.Constructor) - p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) @@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // config key for the particular sub-driver jobs.pipelines.test-local configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false } // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[name] = initializedDriver + p.consumers.Store(name, initializedDriver) // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipe) @@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } func (p *Plugin) Stop() error { - for k, v := range p.consumers { + // range over all consumers and call stop + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := v.Stop(ctx) + err := consumer.Stop(ctx) if err != nil { cancel() - p.log.Error("stop job driver", "driver", k) - continue + p.log.Error("stop job driver", "driver", key) + return true } cancel() - } + return true + }) // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, // but if not, this is not a problem at all. @@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State { func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { const op = errors.Op("jobs_plugin_drivers_state") - jst := make([]*jobState.State, 0, len(p.consumers)) - for k := range p.consumers { - d := p.consumers[k] + jst := make([]*jobState.State, 0, 2) + var err error + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) - state, err := d.State(newCtx) + + var state *jobState.State + state, err = consumer.State(newCtx) if err != nil { cancel() - return nil, errors.E(op, err) + return false } jst = append(jst, state) cancel() + return true + }) + + if err != nil { + return nil, errors.E(op, err) } return jst, nil } @@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() - err := d.Push(ctx, j) + err := d.(jobs.Consumer).Push(ctx, j) if err != nil { p.events.Push(events.JobEvent{ Event: events.EventPushError, ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } @@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.Push(ctx, j[i]) + err := d.(jobs.Consumer).Push(ctx, j[i]) if err != nil { cancel() p.events.Push(events.JobEvent{ @@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Pause(ctx, ppl.Name()) + d.(jobs.Consumer).Pause(ctx, ppl.Name()) } func (p *Plugin) Resume(pp string) { @@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Resume(ctx, ppl.Name()) + d.(jobs.Consumer).Resume(ctx, ppl.Name()) } // Declare a pipeline. @@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[pipeline.Name()] = initializedDriver - // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipeline) if err != nil { @@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { return errors.E(op, err) } } - } - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers.Store(pipeline.Name(), initializedDriver) + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + } return nil } @@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + // delete consumer + d, ok := p.consumers.LoadAndDelete(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } - // delete consumer - delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.(jobs.Consumer).Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + cancel() + return nil } func (p *Plugin) List() []string { |