summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go150
1 files changed, 120 insertions, 30 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index b7e41710..6dd55782 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -52,6 +52,8 @@ type Plugin struct {
// initial set of the pipelines to consume
consume map[string]struct{}
+
+ stopCh chan struct{}
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -72,6 +74,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.jobConstructors = make(map[string]jobs.Constructor)
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
+ p.stopCh = make(chan struct{}, 1)
// initial set of pipelines
for i := range p.cfg.Pipelines {
@@ -145,9 +148,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return false
}
- // if pipeline initialized to be consumed, call Consume on it
+ // if pipeline initialized to be consumed, call Run on it
if _, ok := p.consume[name]; ok {
- err = initializedDriver.Consume(pipe)
+ err = initializedDriver.Run(pipe)
if err != nil {
errCh <- errors.E(op, err)
return false
@@ -171,40 +174,46 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for i := uint8(0); i < p.cfg.NumPollers; i++ {
go func() {
for {
- // get data JOB from the queue
- job := p.queue.GetMax()
-
- ctx, err := job.Context()
- if err != nil {
- errNack := job.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
+ select {
+ case <-p.stopCh:
+ p.log.Debug("------> job poller stopped <------")
+ return
+ default:
+ // get data JOB from the queue
+ job := p.queue.ExtractMin()
+
+ ctx, err := job.Context()
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+ p.log.Error("job marshal context", "error", err)
}
- p.log.Error("job marshal context", "error", err)
- }
-
- exec := payload.Payload{
- Context: ctx,
- Body: job.Body(),
- }
- _, err = p.workersPool.Exec(exec)
- if err != nil {
- errNack := job.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
+ exec := payload.Payload{
+ Context: ctx,
+ Body: job.Body(),
}
- p.log.Error("job execute", "error", err)
- continue
- }
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+
+ p.log.Error("job execute", "error", err)
+ continue
+ }
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
- errAck := job.Ack()
- if errAck != nil {
- p.log.Error("acknowledge failed", "error", errAck)
+ errAck := job.Ack()
+ if errAck != nil {
+ p.log.Error("acknowledge failed", "error", errAck)
+ }
}
}
}()
@@ -215,6 +224,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
+ for k, v := range p.consumers {
+ err := v.Stop()
+ if err != nil {
+ p.log.Error("stop job driver", "driver", k)
+ continue
+ }
+ }
+
+ // this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
+ // but if not, this is not a problem at all.
+ // The main target is to stop the drivers
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ // stop jobs plugin pollers
+ p.stopCh <- struct{}{}
+ }
+ }()
+
+ // just wait pollers for 2 seconds before exit
+ time.Sleep(time.Second * 5)
+
return nil
}
@@ -335,6 +365,66 @@ func (p *Plugin) Resume(pipelines []string) {
}
}
+// Declare a pipeline.
+func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("jobs_plugin_declare")
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipeline.Driver()
+ if dr == "" {
+ return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name()))
+ }
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // init the driver from pipeline
+ initializedDriver, err := c.FromPipeline(pipeline, p.queue)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[pipeline.Name()] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(pipeline)
+ if err != nil {
+ return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ if _, ok := p.consume[pipeline.Name()]; ok {
+ err = initializedDriver.Run(pipeline)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ p.pipelines.Store(pipeline.Name(), pipeline)
+
+ return nil
+}
+
+// Destroy pipeline and release all associated resources.
+func (p *Plugin) Destroy(pp string) error {
+ const op = errors.Op("jobs_plugin_destroy")
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ return d.Stop()
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{
log: p.log,