diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/plugin.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 27 |
1 files changed, 7 insertions, 20 deletions
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 7b6562c7..6743dc2f 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1,11 +1,10 @@ package amqp import ( - "sync/atomic" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -16,27 +15,11 @@ const ( type Plugin struct { log logger.Logger cfg config.Configurer - - numConsumers uint32 - stopCh chan struct{} } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log p.cfg = cfg - p.stopCh = make(chan struct{}) - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - // send stop to the all consumers delivery - for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ { - p.stopCh <- struct{}{} - } return nil } @@ -47,6 +30,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - atomic.AddUint32(&p.numConsumers, 1) - return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq) + return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, pq) } |