diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 127 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 66 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/listener.go | 25 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 27 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit_init.go (renamed from plugins/jobs/brokers/amqp/rabbit.go) | 30 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 37 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 42 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 150 |
9 files changed, 319 insertions, 194 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 5b549874..22eee2dc 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -46,8 +45,8 @@ type Config struct { type JobsConsumer struct { sync.RWMutex - logger logger.Logger - pq priorityqueue.Queue + log logger.Logger + pq priorityqueue.Queue pipelines sync.Map @@ -67,19 +66,20 @@ type JobsConsumer struct { delayCache map[string]struct{} - stop chan struct{} + stopCh chan struct{} } -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) { +// NewAMQPConsumer initializes rabbitmq pipeline +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - logger: log, + log: log, pq: pq, consumeID: uuid.NewString(), - stop: stopCh, + stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, delayCache: make(map[string]struct{}, 100), } @@ -144,6 +144,14 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } +func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) { + _ = exchangeType + _ = exchangeKey + _ = queue + _ = routingKey + panic("not implemented") +} + func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -157,7 +165,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error { defer j.Unlock() // convert - msg := FromJob(job) + msg := fromJob(job) p, err := pack(job.Ident, msg) if err != nil { return errors.E(op, err) @@ -245,12 +253,12 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.pipelines.Store(pipeline.Name(), true) + j.pipelines.Store(pipeline.Name(), struct{}{}) return nil } -func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") if _, ok := j.pipelines.Load(pipeline.Name()); !ok { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) @@ -304,77 +312,74 @@ func (j *JobsConsumer) List() []string { } func (j *JobsConsumer) Pause(pipeline string) { - if q, ok := j.pipelines.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipelines.Store(pipeline, false) - } + if _, ok := j.pipelines.Load(pipeline); !ok { + j.log.Error("no such pipeline", "requested pause on", pipeline) + return } // protect connection (redial) j.Lock() defer j.Unlock() - err := j.publishChan.Cancel(j.consumeID, true) + err := j.consumeChan.Cancel(j.consumeID, true) if err != nil { - j.logger.Error("cancel publish channel, forcing close", "error", err) - errCl := j.publishChan.Close() + j.log.Error("cancel publish channel, forcing close", "error", err) + errCl := j.consumeChan.Close() if errCl != nil { - j.logger.Error("force close failed", "error", err) + j.log.Error("force close failed", "error", err) } } } func (j *JobsConsumer) Resume(pipeline string) { - if q, ok := j.pipelines.Load(pipeline); ok { - if q == false { - // mark pipeline as turned off - j.pipelines.Store(pipeline, true) - } - - // protect connection (redial) - j.Lock() - defer j.Unlock() + if _, ok := j.pipelines.Load(pipeline); !ok { + j.log.Error("no such pipeline", "requested pause on", pipeline) + return + } - var err error - j.consumeChan, err = j.conn.Channel() - if err != nil { - j.logger.Error("create channel on rabbitmq connection", "error", err) - return - } + // protect connection (redial) + j.Lock() + defer j.Unlock() - err = j.consumeChan.Qos(j.prefetchCount, 0, false) - if err != nil { - j.logger.Error("qos set failed", "error", err) - return - } + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.log.Error("create channel on rabbitmq connection", "error", err) + return + } - // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - j.logger.Error("consume operation failed", "error", err) - return - } + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.log.Error("qos set failed", "error", err) + return + } - // run listener - j.listener(deliv) + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.log.Error("consume operation failed", "error", err) + return } + + // run listener + j.listener(deliv) } -// Declare used to dynamically declare a pipeline -func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error { - pipeline.String(exchangeKey, "") - pipeline.String(queue, "") - pipeline.String(routingKey, "") - pipeline.String(exchangeType, "direct") +func (j *JobsConsumer) Stop() error { + j.stopCh <- struct{}{} + j.pipelines.Range(func(key, _ interface{}) bool { + j.pipelines.Delete(key) + return true + }) + return nil } diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 2e8a30af..a46f1ca2 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -20,39 +20,6 @@ const ( rrRetryDelay string = "rr_retry_delay" ) -func FromDelivery(d amqp.Delivery) (*Item, error) { - const op = errors.Op("from_delivery_convert") - item, err := unpack(d) - if err != nil { - return nil, errors.E(op, err) - } - return &Item{ - Job: item.Job, - Ident: item.Ident, - Payload: item.Payload, - Headers: item.Headers, - Options: item.Options, - AckFunc: d.Ack, - NackFunc: d.Nack, - }, nil -} - -func FromJob(job *structs.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: uint32(job.Options.Priority), - Pipeline: job.Options.Pipeline, - Delay: int32(job.Options.Delay), - Attempts: int32(job.Options.Attempts), - RetryDelay: int32(job.Options.RetryDelay), - Timeout: int32(job.Options.Timeout), - }, - } -} - type Item struct { // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` @@ -154,6 +121,39 @@ func (j *Item) Nack() error { return j.NackFunc(false, false) } +func fromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + item, err := unpack(d) + if err != nil { + return nil, errors.E(op, err) + } + return &Item{ + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + AckFunc: d.Ack, + NackFunc: d.Nack, + }, nil +} + +func fromJob(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: uint32(job.Options.Priority), + Pipeline: job.Options.Pipeline, + Delay: int32(job.Options.Delay), + Attempts: int32(job.Options.Attempts), + RetryDelay: int32(job.Options.RetryDelay), + Timeout: int32(job.Options.Timeout), + }, + } +} + // pack job metadata into headers func pack(id string, j *Item) (amqp.Table, error) { headers, err := json.Marshal(j.Headers) diff --git a/plugins/jobs/brokers/amqp/listener.go b/plugins/jobs/brokers/amqp/listener.go new file mode 100644 index 00000000..2b994fc5 --- /dev/null +++ b/plugins/jobs/brokers/amqp/listener.go @@ -0,0 +1,25 @@ +package amqp + +import "github.com/streadway/amqp" + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 7b6562c7..6743dc2f 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1,11 +1,10 @@ package amqp import ( - "sync/atomic" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -16,27 +15,11 @@ const ( type Plugin struct { log logger.Logger cfg config.Configurer - - numConsumers uint32 - stopCh chan struct{} } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log p.cfg = cfg - p.stopCh = make(chan struct{}) - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - // send stop to the all consumers delivery - for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ { - p.stopCh <- struct{}{} - } return nil } @@ -47,6 +30,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - atomic.AddUint32(&p.numConsumers, 1) - return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq) + return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, pq) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit_init.go index b5e73a9d..cb9f2dc4 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit_init.go @@ -2,7 +2,6 @@ package amqp import ( "github.com/spiral/errors" - "github.com/streadway/amqp" ) func (j *JobsConsumer) initRabbitMQ() error { @@ -15,11 +14,6 @@ func (j *JobsConsumer) initRabbitMQ() error { return errors.E(op, err) } - err = channel.Qos(j.prefetchCount, 0, false) - if err != nil { - return errors.E(op, err) - } - // declare an exchange (idempotent operation) err = channel.ExchangeDeclare( j.exchangeName, @@ -61,27 +55,3 @@ func (j *JobsConsumer) initRabbitMQ() error { return channel.Close() } - -func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { - go func() { - for { - select { - case msg, ok := <-deliv: - if !ok { - j.logger.Info("delivery channel closed, leaving the rabbit listener") - return - } - - d, err := FromDelivery(msg) - if err != nil { - j.logger.Error("amqp delivery convert", "error", err) - continue - } - // insert job into the main priority queue - j.pq.Insert(d) - case <-j.stop: - return - } - } - }() -} diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 874e68c4..16071b78 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -12,29 +12,34 @@ import ( func (j *JobsConsumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") - for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { - if err != nil { - j.Lock() - j.logger.Error("connection closed, reconnecting", "error", err) + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + j.log.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) expb.MaxElapsedTime = j.retryTimeout op := func() error { - j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) if dialErr != nil { return fmt.Errorf("fail to dial server endpoint: %v", dialErr) } - j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection errInit := j.initRabbitMQ() if errInit != nil { - j.logger.Error("error while redialing", "error", errInit) + j.log.Error("error while redialing", "error", errInit) return errInit } @@ -69,18 +74,32 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit // restart listener j.listener(deliv) - j.logger.Info("queues and subscribers redeclare succeed") + j.log.Info("queues and subscribers redeclare succeed") return nil } retryErr := backoff.Retry(op, expb) if retryErr != nil { j.Unlock() - j.logger.Error("backoff failed", "error", retryErr) + j.log.Error("backoff failed", "error", retryErr) return } j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } } } }() diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 8f6f4b5f..9d79221c 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -22,14 +22,17 @@ type JobBroker struct { queues sync.Map pq priorityqueue.Queue localQueue chan *Item + + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ - log: log, - pq: q, + log: log, + pq: q, + stopCh: make(chan struct{}, 1), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -50,6 +53,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q return jb, nil } +func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) { + panic("not implemented") +} + func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") @@ -82,8 +89,13 @@ func (j *JobBroker) Push(job *structs.Job) error { func (j *JobBroker) consume() { // redirect - for item := range j.localQueue { - j.pq.Insert(item) + for { + select { + case item := <-j.localQueue: + j.pq.Insert(item) + case <-j.stopCh: + return + } } } @@ -98,11 +110,6 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } -// Consume is no-op for the ephemeral -func (j *JobBroker) Consume(_ *pipeline.Pipeline) error { - return nil -} - func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { @@ -132,3 +139,20 @@ func (j *JobBroker) List() []string { return out } + +// Run is no-op for the ephemeral +func (j *JobBroker) Run(_ *pipeline.Pipeline) error { + return nil +} + +func (j *JobBroker) Stop() error { + j.queues.Range(func(key, _ interface{}) bool { + j.queues.Delete(key) + return true + }) + + // return from the consumer + j.stopCh <- struct{}{} + + return nil +} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 9910d857..75012873 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -4,6 +4,7 @@ import ( "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -28,6 +29,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} -func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, q) +func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, pq) +} + +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipeline, pq) } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index b7e41710..6dd55782 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -52,6 +52,8 @@ type Plugin struct { // initial set of the pipelines to consume consume map[string]struct{} + + stopCh chan struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -72,6 +74,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) + p.stopCh = make(chan struct{}, 1) // initial set of pipelines for i := range p.cfg.Pipelines { @@ -145,9 +148,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return false } - // if pipeline initialized to be consumed, call Consume on it + // if pipeline initialized to be consumed, call Run on it if _, ok := p.consume[name]; ok { - err = initializedDriver.Consume(pipe) + err = initializedDriver.Run(pipe) if err != nil { errCh <- errors.E(op, err) return false @@ -171,40 +174,46 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for i := uint8(0); i < p.cfg.NumPollers; i++ { go func() { for { - // get data JOB from the queue - job := p.queue.GetMax() - - ctx, err := job.Context() - if err != nil { - errNack := job.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) + select { + case <-p.stopCh: + p.log.Debug("------> job poller stopped <------") + return + default: + // get data JOB from the queue + job := p.queue.ExtractMin() + + ctx, err := job.Context() + if err != nil { + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job marshal context", "error", err) } - p.log.Error("job marshal context", "error", err) - } - - exec := payload.Payload{ - Context: ctx, - Body: job.Body(), - } - _, err = p.workersPool.Exec(exec) - if err != nil { - errNack := job.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) + exec := payload.Payload{ + Context: ctx, + Body: job.Body(), } - p.log.Error("job execute", "error", err) - continue - } + _, err = p.workersPool.Exec(exec) + if err != nil { + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + + p.log.Error("job execute", "error", err) + continue + } - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) + // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- + atomic.AddUint64(&rate, 1) - errAck := job.Ack() - if errAck != nil { - p.log.Error("acknowledge failed", "error", errAck) + errAck := job.Ack() + if errAck != nil { + p.log.Error("acknowledge failed", "error", errAck) + } } } }() @@ -215,6 +224,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } func (p *Plugin) Stop() error { + for k, v := range p.consumers { + err := v.Stop() + if err != nil { + p.log.Error("stop job driver", "driver", k) + continue + } + } + + // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, + // but if not, this is not a problem at all. + // The main target is to stop the drivers + go func() { + for i := uint8(0); i < p.cfg.NumPollers; i++ { + // stop jobs plugin pollers + p.stopCh <- struct{}{} + } + }() + + // just wait pollers for 2 seconds before exit + time.Sleep(time.Second * 5) + return nil } @@ -335,6 +365,66 @@ func (p *Plugin) Resume(pipelines []string) { } } +// Declare a pipeline. +func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { + const op = errors.Op("jobs_plugin_declare") + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipeline.Driver() + if dr == "" { + return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name())) + } + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // init the driver from pipeline + initializedDriver, err := c.FromPipeline(pipeline, p.queue) + if err != nil { + return errors.E(op, err) + } + + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[pipeline.Name()] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(pipeline) + if err != nil { + return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) + } + + // if pipeline initialized to be consumed, call Run on it + if _, ok := p.consume[pipeline.Name()]; ok { + err = initializedDriver.Run(pipeline) + if err != nil { + return errors.E(op, err) + } + } + } + + p.pipelines.Store(pipeline.Name(), pipeline) + + return nil +} + +// Destroy pipeline and release all associated resources. +func (p *Plugin) Destroy(pp string) error { + const op = errors.Op("jobs_plugin_destroy") + pipe, ok := p.pipelines.Load(pp) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp)) + } + + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + return d.Stop() +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, |