diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 150 |
1 files changed, 120 insertions, 30 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index b7e41710..6dd55782 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -52,6 +52,8 @@ type Plugin struct { // initial set of the pipelines to consume consume map[string]struct{} + + stopCh chan struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -72,6 +74,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) + p.stopCh = make(chan struct{}, 1) // initial set of pipelines for i := range p.cfg.Pipelines { @@ -145,9 +148,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return false } - // if pipeline initialized to be consumed, call Consume on it + // if pipeline initialized to be consumed, call Run on it if _, ok := p.consume[name]; ok { - err = initializedDriver.Consume(pipe) + err = initializedDriver.Run(pipe) if err != nil { errCh <- errors.E(op, err) return false @@ -171,40 +174,46 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for i := uint8(0); i < p.cfg.NumPollers; i++ { go func() { for { - // get data JOB from the queue - job := p.queue.GetMax() - - ctx, err := job.Context() - if err != nil { - errNack := job.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) + select { + case <-p.stopCh: + p.log.Debug("------> job poller stopped <------") + return + default: + // get data JOB from the queue + job := p.queue.ExtractMin() + + ctx, err := job.Context() + if err != nil { + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job marshal context", "error", err) } - p.log.Error("job marshal context", "error", err) - } - - exec := payload.Payload{ - Context: ctx, - Body: job.Body(), - } - _, err = p.workersPool.Exec(exec) - if err != nil { - errNack := job.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) + exec := payload.Payload{ + Context: ctx, + Body: job.Body(), } - p.log.Error("job execute", "error", err) - continue - } + _, err = p.workersPool.Exec(exec) + if err != nil { + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + + p.log.Error("job execute", "error", err) + continue + } - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) + // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- + atomic.AddUint64(&rate, 1) - errAck := job.Ack() - if errAck != nil { - p.log.Error("acknowledge failed", "error", errAck) + errAck := job.Ack() + if errAck != nil { + p.log.Error("acknowledge failed", "error", errAck) + } } } }() @@ -215,6 +224,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } func (p *Plugin) Stop() error { + for k, v := range p.consumers { + err := v.Stop() + if err != nil { + p.log.Error("stop job driver", "driver", k) + continue + } + } + + // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, + // but if not, this is not a problem at all. + // The main target is to stop the drivers + go func() { + for i := uint8(0); i < p.cfg.NumPollers; i++ { + // stop jobs plugin pollers + p.stopCh <- struct{}{} + } + }() + + // just wait pollers for 2 seconds before exit + time.Sleep(time.Second * 5) + return nil } @@ -335,6 +365,66 @@ func (p *Plugin) Resume(pipelines []string) { } } +// Declare a pipeline. +func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { + const op = errors.Op("jobs_plugin_declare") + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipeline.Driver() + if dr == "" { + return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name())) + } + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // init the driver from pipeline + initializedDriver, err := c.FromPipeline(pipeline, p.queue) + if err != nil { + return errors.E(op, err) + } + + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[pipeline.Name()] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(pipeline) + if err != nil { + return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) + } + + // if pipeline initialized to be consumed, call Run on it + if _, ok := p.consume[pipeline.Name()]; ok { + err = initializedDriver.Run(pipeline) + if err != nil { + return errors.E(op, err) + } + } + } + + p.pipelines.Store(pipeline.Name(), pipeline) + + return nil +} + +// Destroy pipeline and release all associated resources. +func (p *Plugin) Destroy(pp string) error { + const op = errors.Op("jobs_plugin_destroy") + pipe, ok := p.pipelines.Load(pp) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp)) + } + + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + return d.Stop() +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, |