diff options
Diffstat (limited to 'plugins/jobs/brokers')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 127 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 66 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/listener.go | 25 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 27 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit_init.go (renamed from plugins/jobs/brokers/amqp/rabbit.go) | 30 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 37 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 42 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 |
8 files changed, 199 insertions, 164 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 5b549874..22eee2dc 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -46,8 +45,8 @@ type Config struct { type JobsConsumer struct { sync.RWMutex - logger logger.Logger - pq priorityqueue.Queue + log logger.Logger + pq priorityqueue.Queue pipelines sync.Map @@ -67,19 +66,20 @@ type JobsConsumer struct { delayCache map[string]struct{} - stop chan struct{} + stopCh chan struct{} } -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) { +// NewAMQPConsumer initializes rabbitmq pipeline +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - logger: log, + log: log, pq: pq, consumeID: uuid.NewString(), - stop: stopCh, + stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, delayCache: make(map[string]struct{}, 100), } @@ -144,6 +144,14 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } +func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) { + _ = exchangeType + _ = exchangeKey + _ = queue + _ = routingKey + panic("not implemented") +} + func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -157,7 +165,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error { defer j.Unlock() // convert - msg := FromJob(job) + msg := fromJob(job) p, err := pack(job.Ident, msg) if err != nil { return errors.E(op, err) @@ -245,12 +253,12 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.pipelines.Store(pipeline.Name(), true) + j.pipelines.Store(pipeline.Name(), struct{}{}) return nil } -func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") if _, ok := j.pipelines.Load(pipeline.Name()); !ok { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) @@ -304,77 +312,74 @@ func (j *JobsConsumer) List() []string { } func (j *JobsConsumer) Pause(pipeline string) { - if q, ok := j.pipelines.Load(pipeline); ok { - if q == true { - // mark pipeline as turned off - j.pipelines.Store(pipeline, false) - } + if _, ok := j.pipelines.Load(pipeline); !ok { + j.log.Error("no such pipeline", "requested pause on", pipeline) + return } // protect connection (redial) j.Lock() defer j.Unlock() - err := j.publishChan.Cancel(j.consumeID, true) + err := j.consumeChan.Cancel(j.consumeID, true) if err != nil { - j.logger.Error("cancel publish channel, forcing close", "error", err) - errCl := j.publishChan.Close() + j.log.Error("cancel publish channel, forcing close", "error", err) + errCl := j.consumeChan.Close() if errCl != nil { - j.logger.Error("force close failed", "error", err) + j.log.Error("force close failed", "error", err) } } } func (j *JobsConsumer) Resume(pipeline string) { - if q, ok := j.pipelines.Load(pipeline); ok { - if q == false { - // mark pipeline as turned off - j.pipelines.Store(pipeline, true) - } - - // protect connection (redial) - j.Lock() - defer j.Unlock() + if _, ok := j.pipelines.Load(pipeline); !ok { + j.log.Error("no such pipeline", "requested pause on", pipeline) + return + } - var err error - j.consumeChan, err = j.conn.Channel() - if err != nil { - j.logger.Error("create channel on rabbitmq connection", "error", err) - return - } + // protect connection (redial) + j.Lock() + defer j.Unlock() - err = j.consumeChan.Qos(j.prefetchCount, 0, false) - if err != nil { - j.logger.Error("qos set failed", "error", err) - return - } + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.log.Error("create channel on rabbitmq connection", "error", err) + return + } - // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - j.logger.Error("consume operation failed", "error", err) - return - } + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.log.Error("qos set failed", "error", err) + return + } - // run listener - j.listener(deliv) + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.log.Error("consume operation failed", "error", err) + return } + + // run listener + j.listener(deliv) } -// Declare used to dynamically declare a pipeline -func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error { - pipeline.String(exchangeKey, "") - pipeline.String(queue, "") - pipeline.String(routingKey, "") - pipeline.String(exchangeType, "direct") +func (j *JobsConsumer) Stop() error { + j.stopCh <- struct{}{} + j.pipelines.Range(func(key, _ interface{}) bool { + j.pipelines.Delete(key) + return true + }) + return nil } diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 2e8a30af..a46f1ca2 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -20,39 +20,6 @@ const ( rrRetryDelay string = "rr_retry_delay" ) -func FromDelivery(d amqp.Delivery) (*Item, error) { - const op = errors.Op("from_delivery_convert") - item, err := unpack(d) - if err != nil { - return nil, errors.E(op, err) - } - return &Item{ - Job: item.Job, - Ident: item.Ident, - Payload: item.Payload, - Headers: item.Headers, - Options: item.Options, - AckFunc: d.Ack, - NackFunc: d.Nack, - }, nil -} - -func FromJob(job *structs.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: uint32(job.Options.Priority), - Pipeline: job.Options.Pipeline, - Delay: int32(job.Options.Delay), - Attempts: int32(job.Options.Attempts), - RetryDelay: int32(job.Options.RetryDelay), - Timeout: int32(job.Options.Timeout), - }, - } -} - type Item struct { // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` @@ -154,6 +121,39 @@ func (j *Item) Nack() error { return j.NackFunc(false, false) } +func fromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + item, err := unpack(d) + if err != nil { + return nil, errors.E(op, err) + } + return &Item{ + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + AckFunc: d.Ack, + NackFunc: d.Nack, + }, nil +} + +func fromJob(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: uint32(job.Options.Priority), + Pipeline: job.Options.Pipeline, + Delay: int32(job.Options.Delay), + Attempts: int32(job.Options.Attempts), + RetryDelay: int32(job.Options.RetryDelay), + Timeout: int32(job.Options.Timeout), + }, + } +} + // pack job metadata into headers func pack(id string, j *Item) (amqp.Table, error) { headers, err := json.Marshal(j.Headers) diff --git a/plugins/jobs/brokers/amqp/listener.go b/plugins/jobs/brokers/amqp/listener.go new file mode 100644 index 00000000..2b994fc5 --- /dev/null +++ b/plugins/jobs/brokers/amqp/listener.go @@ -0,0 +1,25 @@ +package amqp + +import "github.com/streadway/amqp" + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 7b6562c7..6743dc2f 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1,11 +1,10 @@ package amqp import ( - "sync/atomic" - "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -16,27 +15,11 @@ const ( type Plugin struct { log logger.Logger cfg config.Configurer - - numConsumers uint32 - stopCh chan struct{} } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log p.cfg = cfg - p.stopCh = make(chan struct{}) - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - // send stop to the all consumers delivery - for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ { - p.stopCh <- struct{}{} - } return nil } @@ -47,6 +30,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - atomic.AddUint32(&p.numConsumers, 1) - return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq) + return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, pq) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit_init.go index b5e73a9d..cb9f2dc4 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit_init.go @@ -2,7 +2,6 @@ package amqp import ( "github.com/spiral/errors" - "github.com/streadway/amqp" ) func (j *JobsConsumer) initRabbitMQ() error { @@ -15,11 +14,6 @@ func (j *JobsConsumer) initRabbitMQ() error { return errors.E(op, err) } - err = channel.Qos(j.prefetchCount, 0, false) - if err != nil { - return errors.E(op, err) - } - // declare an exchange (idempotent operation) err = channel.ExchangeDeclare( j.exchangeName, @@ -61,27 +55,3 @@ func (j *JobsConsumer) initRabbitMQ() error { return channel.Close() } - -func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { - go func() { - for { - select { - case msg, ok := <-deliv: - if !ok { - j.logger.Info("delivery channel closed, leaving the rabbit listener") - return - } - - d, err := FromDelivery(msg) - if err != nil { - j.logger.Error("amqp delivery convert", "error", err) - continue - } - // insert job into the main priority queue - j.pq.Insert(d) - case <-j.stop: - return - } - } - }() -} diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 874e68c4..16071b78 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -12,29 +12,34 @@ import ( func (j *JobsConsumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") - for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { - if err != nil { - j.Lock() - j.logger.Error("connection closed, reconnecting", "error", err) + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + j.log.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) expb.MaxElapsedTime = j.retryTimeout op := func() error { - j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) if dialErr != nil { return fmt.Errorf("fail to dial server endpoint: %v", dialErr) } - j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection errInit := j.initRabbitMQ() if errInit != nil { - j.logger.Error("error while redialing", "error", errInit) + j.log.Error("error while redialing", "error", errInit) return errInit } @@ -69,18 +74,32 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit // restart listener j.listener(deliv) - j.logger.Info("queues and subscribers redeclare succeed") + j.log.Info("queues and subscribers redeclare succeed") return nil } retryErr := backoff.Retry(op, expb) if retryErr != nil { j.Unlock() - j.logger.Error("backoff failed", "error", retryErr) + j.log.Error("backoff failed", "error", retryErr) return } j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } } } }() diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 8f6f4b5f..9d79221c 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -22,14 +22,17 @@ type JobBroker struct { queues sync.Map pq priorityqueue.Queue localQueue chan *Item + + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ - log: log, - pq: q, + log: log, + pq: q, + stopCh: make(chan struct{}, 1), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -50,6 +53,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q return jb, nil } +func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) { + panic("not implemented") +} + func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") @@ -82,8 +89,13 @@ func (j *JobBroker) Push(job *structs.Job) error { func (j *JobBroker) consume() { // redirect - for item := range j.localQueue { - j.pq.Insert(item) + for { + select { + case item := <-j.localQueue: + j.pq.Insert(item) + case <-j.stopCh: + return + } } } @@ -98,11 +110,6 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } -// Consume is no-op for the ephemeral -func (j *JobBroker) Consume(_ *pipeline.Pipeline) error { - return nil -} - func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { @@ -132,3 +139,20 @@ func (j *JobBroker) List() []string { return out } + +// Run is no-op for the ephemeral +func (j *JobBroker) Run(_ *pipeline.Pipeline) error { + return nil +} + +func (j *JobBroker) Stop() error { + j.queues.Range(func(key, _ interface{}) bool { + j.queues.Delete(key) + return true + }) + + // return from the consumer + j.stopCh <- struct{}{} + + return nil +} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 9910d857..75012873 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -4,6 +4,7 @@ import ( "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -28,6 +29,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} -func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, q) +func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, pq) +} + +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipeline, pq) } |