diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
commit | 4566f88004e81d3229222d82614c15346ac2e47d (patch) | |
tree | 05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a | |
parent | 5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff) |
AMQP update...
Add redialer, consumer, rabbit queues initializer.
Update configuration (.rr.yaml).
Add ack/nack for the jobs main plugin with error handling.
Add Qos, queues bining and declaration.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 1 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 6 | ||||
-rw-r--r-- | pkg/priorityqueue/interface.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 79 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 31 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 97 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 58 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 6 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 16 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 6 | ||||
-rw-r--r-- | tests/plugins/kv/rr.db | bin | 0 -> 32768 bytes |
12 files changed, 270 insertions, 36 deletions
@@ -6,6 +6,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect github.com/alicebob/miniredis/v2 v2.14.5 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b + github.com/cenkalti/backoff/v4 v4.1.1 github.com/fasthttp/websocket v1.4.3 github.com/fatih/color v1.12.0 github.com/go-ole/go-ole v1.2.5 // indirect @@ -22,6 +23,7 @@ require ( github.com/spiral/endure v1.0.2 github.com/spiral/errors v1.0.11 github.com/spiral/goridge/v3 v3.1.4 + github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.6 // indirect @@ -391,6 +391,7 @@ github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdU github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40= github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 6be6a99d..06d0735c 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -12,10 +12,12 @@ import ( type Test int -func (t Test) Ack() { +func (t Test) Ack() error { + return nil } -func (t Test) Nack() { +func (t Test) Nack() error { + return nil } func (t Test) Body() []byte { diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go index 7c053e6d..8278dc8d 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -21,8 +21,8 @@ type Item interface { Context() ([]byte, error) // Ack - acknowledge the Item after processing - Ack() + Ack() error // Nack - discard the Item - Nack() + Nack() error } diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 0b8a5a5b..f91b71e7 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1,12 +1,17 @@ package amqp import ( + "sync" + "time" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/streadway/amqp" ) type Config struct { @@ -15,35 +20,89 @@ type Config struct { } type JobsConsumer struct { - log logger.Logger - pq priorityqueue.Queue + sync.RWMutex + logger logger.Logger + pq priorityqueue.Queue + + pipelines sync.Map + + // amqp connection + conn *amqp.Connection + retryTimeout time.Duration + prefetchCount int + exchangeName string + connStr string + exchangeType string + routingKey string + + stop chan struct{} } func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp name + // second part - queues and other pipeline information jb := &JobsConsumer{ - log: log, - pq: pq, + logger: log, + pq: pq, } + d, err := jb.initRabbitMQ() + if err != nil { + return nil, err + } + + // run listener + jb.listener(d) + + // run redialer + jb.redialer() + return jb, nil } -func (j JobsConsumer) Push(job *structs.Job) error { - panic("implement me") +func (j *JobsConsumer) Push(job *structs.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.pipelines.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + } + + // handle timeouts + if job.Options.Timeout > 0 { + go func(jj *structs.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // TODO push + + // send the item after timeout expired + }(job) + + return nil + } + + // insert to the local, limited pipeline + + return nil + } + + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } -func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { panic("implement me") } -func (j JobsConsumer) List() []*pipeline.Pipeline { +func (j *JobsConsumer) List() []*pipeline.Pipeline { panic("implement me") } -func (j JobsConsumer) Pause(pipeline string) { +func (j *JobsConsumer) Pause(pipeline string) { panic("implement me") } -func (j JobsConsumer) Resume(pipeline string) { +func (j *JobsConsumer) Resume(pipeline string) { panic("implement me") } diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index ddb4e291..7f1bf204 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -4,23 +4,17 @@ import ( "time" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/utils" + "github.com/streadway/amqp" ) -func From(job *structs.Job) *Item { +func From(d amqp.Delivery) *Item { return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: conv(*job.Options), + AckFunc: d.Ack, + NackFunc: d.Nack, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -37,9 +31,14 @@ type Item struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options Options `json:"options,omitempty"` - AckFunc func() + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + AckFunc func(multiply bool) error - NackFunc func() + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + NackFunc func(multiply bool, requeue bool) error } // Options carry information about how to handle given job. @@ -121,10 +120,10 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() { - // noop for the in-memory +func (j *Item) Ack() error { + return j.AckFunc(false) } -func (j *Item) Nack() { - // noop for the in-memory +func (j *Item) Nack() error { + return j.NackFunc(false, false) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go new file mode 100644 index 00000000..41374878 --- /dev/null +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -0,0 +1,97 @@ +package amqp + +import ( + "fmt" + + "github.com/google/uuid" + "github.com/streadway/amqp" +) + +func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return nil, err + } + + err = channel.Qos(j.prefetchCount, 0, false) + if err != nil { + return nil, err + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return nil, err + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()), + false, + false, + true, + false, + nil, + ) + if err != nil { + return nil, err + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return nil, err + } + + // start reading messages from the channel + deliv, err := channel.Consume( + q.Name, + "", + false, + false, + false, + false, + nil, + ) + if err != nil { + return nil, err + } + + return deliv, nil +} + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { + select { + case msg, ok := <-deliv: + if !ok { + j.logger.Info("delivery channel closed, leaving the rabbit listener") + return + } + + // add task to the queue + j.pq.Insert(From(msg)) + case <-j.stop: + return + } + } + }() +} diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go new file mode 100644 index 00000000..bfb1fbff --- /dev/null +++ b/plugins/jobs/brokers/amqp/redial.go @@ -0,0 +1,58 @@ +package amqp + +import ( + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { + go func() { + for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { + if err != nil { + j.logger.Error("connection closed, reconnecting", "error", err) + + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * j.retryTimeout + op := func() error { + j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + + j.Lock() + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + j.Unlock() + return fmt.Errorf("fail to dial server endpoint: %v", dialErr) + } + j.Unlock() + + j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + deliv, errInit := j.initRabbitMQ() + if errInit != nil { + j.Unlock() + j.logger.Error("error while redialing", "error", errInit) + return errInit + } + + // restart listener + j.listener(deliv) + + j.logger.Info("queues and subscribers redeclare succeed") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.logger.Error("backoff failed", "error", retryErr) + return + } + } + } + }() +} diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index 40c6b3e4..76e83d00 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -118,10 +118,12 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() { +func (j *Item) Ack() error { // noop for the in-memory + return nil } -func (j *Item) Nack() { +func (j *Item) Nack() error { // noop for the in-memory + return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 51da9421..9d68a95a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -171,7 +171,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit ctx, err := job.Context() if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } p.log.Error("job marshal context", "error", err) } @@ -182,7 +185,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit _, err = p.workersPool.Exec(exec) if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job execute", "error", err) continue } @@ -190,7 +197,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) - job.Ack() + errAck := job.Ack() + if errAck != nil { + p.log.Error("acknowledge failed", "error", errAck) + } } }() } diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index 90590ccb..1648fa6c 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -9,6 +9,7 @@ server: amqp: addr: amqp://guest:guest@localhost:5672/ + # beanstalk configuration beanstalk: addr: tcp://localhost:11300 @@ -45,7 +46,10 @@ jobs: test-1: driver: amqp priority: 1 - queue: default + queue: test-1-queue + exchange: default + exchange_type: direct + routing_key: test pipeline_size: 1000000 test-2: diff --git a/tests/plugins/kv/rr.db b/tests/plugins/kv/rr.db Binary files differnew file mode 100644 index 00000000..4267eb2c --- /dev/null +++ b/tests/plugins/kv/rr.db |