diff options
author | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
commit | 0f70f1e2311640236d74a0a237536779d8d44223 (patch) | |
tree | 8b2e9dc32b5b6bafe418083c33cce3dbb8f277c7 /plugins/jobs/brokers/amqp/plugin.go | |
parent | 240b114e1ea3c1414bcd9f4d2c050d56c467222f (diff) |
Update JOBS interface, Renamed Consume -> Run.
Add DYNAMIC declaration of the pipelines. Update Jobs constructor
interface, add FromPipeline method to construct jobs driver (unique)
via the `Declare` RPC call.
Add `Stop` method to gracefully stop all consumers.
Binary heaps `GetMax` to canonical `ExtractMin`.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/plugin.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 27 |
1 files changed, 7 insertions, 20 deletions
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 7b6562c7..6743dc2f 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1,11 +1,10 @@ package amqp import ( - "sync/atomic" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -16,27 +15,11 @@ const ( type Plugin struct { log logger.Logger cfg config.Configurer - - numConsumers uint32 - stopCh chan struct{} } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log p.cfg = cfg - p.stopCh = make(chan struct{}) - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - // send stop to the all consumers delivery - for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ { - p.stopCh <- struct{}{} - } return nil } @@ -47,6 +30,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - atomic.AddUint32(&p.numConsumers, 1) - return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq) + return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, pq) } |