diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
commit | 4566f88004e81d3229222d82614c15346ac2e47d (patch) | |
tree | 05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a /plugins | |
parent | 5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff) |
AMQP update...
Add redialer, consumer, rabbit queues initializer.
Update configuration (.rr.yaml).
Add ack/nack for the jobs main plugin with error handling.
Add Qos, queues bining and declaration.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 79 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 31 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 97 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 58 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 6 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 16 |
6 files changed, 256 insertions, 31 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 0b8a5a5b..f91b71e7 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1,12 +1,17 @@ package amqp import ( + "sync" + "time" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/streadway/amqp" ) type Config struct { @@ -15,35 +20,89 @@ type Config struct { } type JobsConsumer struct { - log logger.Logger - pq priorityqueue.Queue + sync.RWMutex + logger logger.Logger + pq priorityqueue.Queue + + pipelines sync.Map + + // amqp connection + conn *amqp.Connection + retryTimeout time.Duration + prefetchCount int + exchangeName string + connStr string + exchangeType string + routingKey string + + stop chan struct{} } func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp name + // second part - queues and other pipeline information jb := &JobsConsumer{ - log: log, - pq: pq, + logger: log, + pq: pq, } + d, err := jb.initRabbitMQ() + if err != nil { + return nil, err + } + + // run listener + jb.listener(d) + + // run redialer + jb.redialer() + return jb, nil } -func (j JobsConsumer) Push(job *structs.Job) error { - panic("implement me") +func (j *JobsConsumer) Push(job *structs.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.pipelines.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + } + + // handle timeouts + if job.Options.Timeout > 0 { + go func(jj *structs.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // TODO push + + // send the item after timeout expired + }(job) + + return nil + } + + // insert to the local, limited pipeline + + return nil + } + + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } -func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { panic("implement me") } -func (j JobsConsumer) List() []*pipeline.Pipeline { +func (j *JobsConsumer) List() []*pipeline.Pipeline { panic("implement me") } -func (j JobsConsumer) Pause(pipeline string) { +func (j *JobsConsumer) Pause(pipeline string) { panic("implement me") } -func (j JobsConsumer) Resume(pipeline string) { +func (j *JobsConsumer) Resume(pipeline string) { panic("implement me") } diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index ddb4e291..7f1bf204 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -4,23 +4,17 @@ import ( "time" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/utils" + "github.com/streadway/amqp" ) -func From(job *structs.Job) *Item { +func From(d amqp.Delivery) *Item { return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: conv(*job.Options), + AckFunc: d.Ack, + NackFunc: d.Nack, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -37,9 +31,14 @@ type Item struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options Options `json:"options,omitempty"` - AckFunc func() + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + AckFunc func(multiply bool) error - NackFunc func() + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + NackFunc func(multiply bool, requeue bool) error } // Options carry information about how to handle given job. @@ -121,10 +120,10 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() { - // noop for the in-memory +func (j *Item) Ack() error { + return j.AckFunc(false) } -func (j *Item) Nack() { - // noop for the in-memory +func (j *Item) Nack() error { + return j.NackFunc(false, false) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go new file mode 100644 index 00000000..41374878 --- /dev/null +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -0,0 +1,97 @@ +package amqp + +import ( + "fmt" + + "github.com/google/uuid" + "github.com/streadway/amqp" +) + +func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return nil, err + } + + err = channel.Qos(j.prefetchCount, 0, false) + if err != nil { + return nil, err + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return nil, err + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()), + false, + false, + true, + false, + nil, + ) + if err != nil { + return nil, err + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return nil, err + } + + // start reading messages from the channel + deliv, err := channel.Consume( + q.Name, + "", + false, + false, + false, + false, + nil, + ) + if err != nil { + return nil, err + } + + return deliv, nil +} + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { + select { + case msg, ok := <-deliv: + if !ok { + j.logger.Info("delivery channel closed, leaving the rabbit listener") + return + } + + // add task to the queue + j.pq.Insert(From(msg)) + case <-j.stop: + return + } + } + }() +} diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go new file mode 100644 index 00000000..bfb1fbff --- /dev/null +++ b/plugins/jobs/brokers/amqp/redial.go @@ -0,0 +1,58 @@ +package amqp + +import ( + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { + go func() { + for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { + if err != nil { + j.logger.Error("connection closed, reconnecting", "error", err) + + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * j.retryTimeout + op := func() error { + j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + + j.Lock() + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + j.Unlock() + return fmt.Errorf("fail to dial server endpoint: %v", dialErr) + } + j.Unlock() + + j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + deliv, errInit := j.initRabbitMQ() + if errInit != nil { + j.Unlock() + j.logger.Error("error while redialing", "error", errInit) + return errInit + } + + // restart listener + j.listener(deliv) + + j.logger.Info("queues and subscribers redeclare succeed") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.logger.Error("backoff failed", "error", retryErr) + return + } + } + } + }() +} diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index 40c6b3e4..76e83d00 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -118,10 +118,12 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() { +func (j *Item) Ack() error { // noop for the in-memory + return nil } -func (j *Item) Nack() { +func (j *Item) Nack() error { // noop for the in-memory + return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 51da9421..9d68a95a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -171,7 +171,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit ctx, err := job.Context() if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } p.log.Error("job marshal context", "error", err) } @@ -182,7 +185,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit _, err = p.workersPool.Exec(exec) if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job execute", "error", err) continue } @@ -190,7 +197,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) - job.Ack() + errAck := job.Ack() + if errAck != nil { + p.log.Error("acknowledge failed", "error", errAck) + } } }() } |