summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
committerValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
commit4566f88004e81d3229222d82614c15346ac2e47d (patch)
tree05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a
parent5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff)
AMQP update...
Add redialer, consumer, rabbit queues initializer. Update configuration (.rr.yaml). Add ack/nack for the jobs main plugin with error handling. Add Qos, queues bining and declaration. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod2
-rw-r--r--go.sum1
-rw-r--r--pkg/priorityqueue/binary_heap_test.go6
-rw-r--r--pkg/priorityqueue/interface.go4
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go79
-rw-r--r--plugins/jobs/brokers/amqp/item.go31
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go97
-rw-r--r--plugins/jobs/brokers/amqp/redial.go58
-rw-r--r--plugins/jobs/brokers/ephemeral/item.go6
-rw-r--r--plugins/jobs/plugin.go16
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml6
-rw-r--r--tests/plugins/kv/rr.dbbin0 -> 32768 bytes
12 files changed, 270 insertions, 36 deletions
diff --git a/go.mod b/go.mod
index 2ac9684c..7a12c9b9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/alicebob/miniredis/v2 v2.14.5
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
+ github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
@@ -22,6 +23,7 @@ require (
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.4
+ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index f218097f..c64a1a33 100644
--- a/go.sum
+++ b/go.sum
@@ -391,6 +391,7 @@ github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdU
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
+github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index 6be6a99d..06d0735c 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -12,10 +12,12 @@ import (
type Test int
-func (t Test) Ack() {
+func (t Test) Ack() error {
+ return nil
}
-func (t Test) Nack() {
+func (t Test) Nack() error {
+ return nil
}
func (t Test) Body() []byte {
diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go
index 7c053e6d..8278dc8d 100644
--- a/pkg/priorityqueue/interface.go
+++ b/pkg/priorityqueue/interface.go
@@ -21,8 +21,8 @@ type Item interface {
Context() ([]byte, error)
// Ack - acknowledge the Item after processing
- Ack()
+ Ack() error
// Nack - discard the Item
- Nack()
+ Nack() error
}
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 0b8a5a5b..f91b71e7 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -1,12 +1,17 @@
package amqp
import (
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/streadway/amqp"
)
type Config struct {
@@ -15,35 +20,89 @@ type Config struct {
}
type JobsConsumer struct {
- log logger.Logger
- pq priorityqueue.Queue
+ sync.RWMutex
+ logger logger.Logger
+ pq priorityqueue.Queue
+
+ pipelines sync.Map
+
+ // amqp connection
+ conn *amqp.Connection
+ retryTimeout time.Duration
+ prefetchCount int
+ exchangeName string
+ connStr string
+ exchangeType string
+ routingKey string
+
+ stop chan struct{}
}
func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp name
+ // second part - queues and other pipeline information
jb := &JobsConsumer{
- log: log,
- pq: pq,
+ logger: log,
+ pq: pq,
}
+ d, err := jb.initRabbitMQ()
+ if err != nil {
+ return nil, err
+ }
+
+ // run listener
+ jb.listener(d)
+
+ // run redialer
+ jb.redialer()
+
return jb, nil
}
-func (j JobsConsumer) Push(job *structs.Job) error {
- panic("implement me")
+func (j *JobsConsumer) Push(job *structs.Job) error {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.pipelines.Load(job.Options.Pipeline); ok {
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ }
+
+ // handle timeouts
+ if job.Options.Timeout > 0 {
+ go func(jj *structs.Job) {
+ time.Sleep(jj.Options.TimeoutDuration())
+
+ // TODO push
+
+ // send the item after timeout expired
+ }(job)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
-func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
panic("implement me")
}
-func (j JobsConsumer) List() []*pipeline.Pipeline {
+func (j *JobsConsumer) List() []*pipeline.Pipeline {
panic("implement me")
}
-func (j JobsConsumer) Pause(pipeline string) {
+func (j *JobsConsumer) Pause(pipeline string) {
panic("implement me")
}
-func (j JobsConsumer) Resume(pipeline string) {
+func (j *JobsConsumer) Resume(pipeline string) {
panic("implement me")
}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index ddb4e291..7f1bf204 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -4,23 +4,17 @@ import (
"time"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/utils"
+ "github.com/streadway/amqp"
)
-func From(job *structs.Job) *Item {
+func From(d amqp.Delivery) *Item {
return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: conv(*job.Options),
+ AckFunc: d.Ack,
+ NackFunc: d.Nack,
}
}
-func conv(jo structs.Options) Options {
- return Options(jo)
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -37,9 +31,14 @@ type Item struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options Options `json:"options,omitempty"`
- AckFunc func()
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ AckFunc func(multiply bool) error
- NackFunc func()
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ NackFunc func(multiply bool, requeue bool) error
}
// Options carry information about how to handle given job.
@@ -121,10 +120,10 @@ func (j *Item) Context() ([]byte, error) {
return ctx, nil
}
-func (j *Item) Ack() {
- // noop for the in-memory
+func (j *Item) Ack() error {
+ return j.AckFunc(false)
}
-func (j *Item) Nack() {
- // noop for the in-memory
+func (j *Item) Nack() error {
+ return j.NackFunc(false, false)
}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
new file mode 100644
index 00000000..41374878
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -0,0 +1,97 @@
+package amqp
+
+import (
+ "fmt"
+
+ "github.com/google/uuid"
+ "github.com/streadway/amqp"
+)
+
+func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := j.conn.Channel()
+ if err != nil {
+ return nil, err
+ }
+
+ err = channel.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ return nil, err
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ j.exchangeName,
+ j.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()),
+ false,
+ false,
+ true,
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ j.routingKey,
+ j.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // start reading messages from the channel
+ deliv, err := channel.Consume(
+ q.Name,
+ "",
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return deliv, nil
+}
+
+func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
+ go func() {
+ for {
+ select {
+ case msg, ok := <-deliv:
+ if !ok {
+ j.logger.Info("delivery channel closed, leaving the rabbit listener")
+ return
+ }
+
+ // add task to the queue
+ j.pq.Insert(From(msg))
+ case <-j.stop:
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
new file mode 100644
index 00000000..bfb1fbff
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -0,0 +1,58 @@
+package amqp
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/streadway/amqp"
+)
+
+// redialer used to redial to the rabbitmq in case of the connection interrupts
+func (j *JobsConsumer) redialer() {
+ go func() {
+ for err := range j.conn.NotifyClose(make(chan *amqp.Error)) {
+ if err != nil {
+ j.logger.Error("connection closed, reconnecting", "error", err)
+
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * j.retryTimeout
+ op := func() error {
+ j.logger.Warn("rabbitmq reconnecting, caused by", "error", err)
+
+ j.Lock()
+ var dialErr error
+ j.conn, dialErr = amqp.Dial(j.connStr)
+ if dialErr != nil {
+ j.Unlock()
+ return fmt.Errorf("fail to dial server endpoint: %v", dialErr)
+ }
+ j.Unlock()
+
+ j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+
+ // re-init connection
+ deliv, errInit := j.initRabbitMQ()
+ if errInit != nil {
+ j.Unlock()
+ j.logger.Error("error while redialing", "error", errInit)
+ return errInit
+ }
+
+ // restart listener
+ j.listener(deliv)
+
+ j.logger.Info("queues and subscribers redeclare succeed")
+ return nil
+ }
+
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.logger.Error("backoff failed", "error", retryErr)
+ return
+ }
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go
index 40c6b3e4..76e83d00 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/brokers/ephemeral/item.go
@@ -118,10 +118,12 @@ func (j *Item) Context() ([]byte, error) {
return ctx, nil
}
-func (j *Item) Ack() {
+func (j *Item) Ack() error {
// noop for the in-memory
+ return nil
}
-func (j *Item) Nack() {
+func (j *Item) Nack() error {
// noop for the in-memory
+ return nil
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 51da9421..9d68a95a 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -171,7 +171,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
ctx, err := job.Context()
if err != nil {
- job.Nack()
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
p.log.Error("job marshal context", "error", err)
}
@@ -182,7 +185,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
_, err = p.workersPool.Exec(exec)
if err != nil {
- job.Nack()
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+
p.log.Error("job execute", "error", err)
continue
}
@@ -190,7 +197,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
atomic.AddUint64(&rate, 1)
- job.Ack()
+ errAck := job.Ack()
+ if errAck != nil {
+ p.log.Error("acknowledge failed", "error", errAck)
+ }
}
}()
}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 90590ccb..1648fa6c 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -9,6 +9,7 @@ server:
amqp:
addr: amqp://guest:guest@localhost:5672/
+
# beanstalk configuration
beanstalk:
addr: tcp://localhost:11300
@@ -45,7 +46,10 @@ jobs:
test-1:
driver: amqp
priority: 1
- queue: default
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
pipeline_size: 1000000
test-2:
diff --git a/tests/plugins/kv/rr.db b/tests/plugins/kv/rr.db
new file mode 100644
index 00000000..4267eb2c
--- /dev/null
+++ b/tests/plugins/kv/rr.db
Binary files differ