diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 67 |
1 files changed, 27 insertions, 40 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index a7916f7e..481e102a 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -3,10 +3,12 @@ package amqp import ( "fmt" "sync" + "sync/atomic" "time" "github.com/google/uuid" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -48,8 +50,9 @@ type JobsConsumer struct { sync.RWMutex log logger.Logger pq priorityqueue.Queue + eh events.Handler - pipelines sync.Map + pipeline atomic.Value // amqp connection conn *amqp.Connection @@ -71,7 +74,7 @@ type JobsConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -79,6 +82,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, jb := &JobsConsumer{ log: log, pq: pq, + eh: e, consumeID: uuid.NewString(), stopCh: make(chan struct{}), // TODO to config @@ -146,13 +150,14 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ log: log, + eh: e, pq: pq, consumeID: uuid.NewString(), stopCh: make(chan struct{}), @@ -210,8 +215,11 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered - if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != job.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } // lock needed here to protect redial concurrent operation @@ -303,20 +311,16 @@ func (j *JobsConsumer) Push(job *structs.Job) error { } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { - const op = errors.Op("rabbitmq_register") - if _, ok := j.pipelines.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipelines.Store(pipeline.Name(), struct{}{}) - + j.pipeline.Store(pipeline) return nil } -func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") - if _, ok := j.pipelines.Load(pipeline.Name()); !ok { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } // protect connection (redial) @@ -354,22 +358,10 @@ func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobsConsumer) List() []string { - out := make([]string, 0, 2) - - j.pipelines.Range(func(key, value interface{}) bool { - pipe := key.(string) - out = append(out, pipe) - return true - }) - - return out -} - -func (j *JobsConsumer) Pause(pipeline string) { - if _, ok := j.pipelines.Load(pipeline); !ok { - j.log.Error("no such pipeline", "requested pause on", pipeline) - return +func (j *JobsConsumer) Pause(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) } // protect connection (redial) @@ -386,10 +378,10 @@ func (j *JobsConsumer) Pause(pipeline string) { } } -func (j *JobsConsumer) Resume(pipeline string) { - if _, ok := j.pipelines.Load(pipeline); !ok { - j.log.Error("no such pipeline", "requested pause on", pipeline) - return +func (j *JobsConsumer) Resume(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) } // protect connection (redial) @@ -430,11 +422,6 @@ func (j *JobsConsumer) Resume(pipeline string) { func (j *JobsConsumer) Stop() error { j.stopCh <- struct{}{} - j.pipelines.Range(func(key, _ interface{}) bool { - j.pipelines.Delete(key) - return true - }) - return nil } |