summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go282
-rw-r--r--plugins/jobs/brokers/amqp/headers.go68
-rw-r--r--plugins/jobs/brokers/amqp/item.go2
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go4
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go35
-rw-r--r--plugins/jobs/brokers/amqp/redial.go47
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go5
7 files changed, 382 insertions, 61 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index f91b71e7..9ac47269 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -1,9 +1,11 @@
package amqp
import (
+ "fmt"
"sync"
"time"
+ "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
@@ -14,9 +16,32 @@ import (
"github.com/streadway/amqp"
)
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
type Config struct {
- Addr string
- Queue string
+ PrefetchCount int `mapstructure:"pipeline_size"`
+ Queue string `mapstructure:"queue"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
}
type JobsConsumer struct {
@@ -27,35 +52,91 @@ type JobsConsumer struct {
pipelines sync.Map
// amqp connection
- conn *amqp.Connection
+ conn *amqp.Connection
+ consumeChan *amqp.Channel
+ publishChan *amqp.Channel
+
retryTimeout time.Duration
prefetchCount int
exchangeName string
+ queue string
+ consumeID string
connStr string
exchangeType string
routingKey string
+ // TODO send data to channel
stop chan struct{}
}
func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp name
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
jb := &JobsConsumer{
- logger: log,
- pq: pq,
+ logger: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stop: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ }
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- d, err := jb.initRabbitMQ()
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(configKey, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeCfg.RoutingKey
+ jb.queue = pipeCfg.Queue
+ jb.exchangeType = pipeCfg.ExchangeType
+ jb.exchangeName = pipeCfg.Exchange
+ jb.prefetchCount = pipeCfg.PrefetchCount
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // assign address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
if err != nil {
return nil, err
}
- // run listener
- jb.listener(d)
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ panic(err)
+ }
- // run redialer
+ // run redialer for the connection
jb.redialer()
return jb, nil
@@ -63,27 +144,57 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
func (j *JobsConsumer) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
+ j.RLock()
+ defer j.RUnlock()
// check if the pipeline registered
- if b, ok := j.pipelines.Load(job.Options.Pipeline); ok {
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
- }
-
+ if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
// handle timeouts
- if job.Options.Timeout > 0 {
- go func(jj *structs.Job) {
- time.Sleep(jj.Options.TimeoutDuration())
+ if job.Options.DelayDuration() > 0 {
+ // pub
+ delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- // TODO push
+ _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: 100,
+ dlxExpires: 200,
+ })
- // send the item after timeout expired
- }(job)
+ if err != nil {
+ panic(err)
+ }
+
+ err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ panic(err)
+ }
+
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, 0, job),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: nil,
+ })
+ if err != nil {
+ panic(err)
+ }
return nil
}
// insert to the local, limited pipeline
+ err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ //Headers: pack(job.Ident, 0, job),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: nil,
+ })
+ if err != nil {
+ panic(err)
+ }
return nil
}
@@ -92,7 +203,52 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
}
func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
- panic("implement me")
+ const op = errors.Op("rabbitmq_register")
+ if _, ok := j.pipelines.Load(pipeline.Name()); ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
+ }
+
+ j.pipelines.Store(pipeline.Name(), true)
+
+ return nil
+}
+
+func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ if _, ok := j.pipelines.Load(pipeline.Name()); !ok {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name()))
+ }
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // run listener
+ j.listener(deliv)
+
+ return nil
}
func (j *JobsConsumer) List() []*pipeline.Pipeline {
@@ -100,9 +256,87 @@ func (j *JobsConsumer) List() []*pipeline.Pipeline {
}
func (j *JobsConsumer) Pause(pipeline string) {
- panic("implement me")
+ if q, ok := j.pipelines.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.pipelines.Store(pipeline, false)
+ }
+ }
+
+ err := j.publishChan.Cancel(j.consumeID, true)
+ if err != nil {
+ j.logger.Error("cancel publish channel, forcing close", "error", err)
+ errCl := j.publishChan.Close()
+ if errCl != nil {
+ j.logger.Error("force close failed", "error", err)
+ }
+ }
}
func (j *JobsConsumer) Resume(pipeline string) {
- panic("implement me")
+ if q, ok := j.pipelines.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.pipelines.Store(pipeline, true)
+ }
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ j.logger.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ j.logger.Error("qos set failed", "error", err)
+ return
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ j.logger.Error("consume operation failed", "error", err)
+ return
+ }
+
+ // run listener
+ j.listener(deliv)
+ }
+}
+
+// Declare used to dynamically declare a pipeline
+func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error {
+ pipeline.String(exchangeKey, "")
+ pipeline.String(queue, "")
+ pipeline.String(routingKey, "")
+ pipeline.String(exchangeType, "direct")
+ return nil
+}
+
+func (c *Config) InitDefault() {
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "default"
+ }
+
+ if c.PrefetchCount == 0 {
+ c.PrefetchCount = 100
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:guest@localhost:5672/"
+ }
}
diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go
new file mode 100644
index 00000000..b1f9c89d
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/headers.go
@@ -0,0 +1,68 @@
+package amqp
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/streadway/amqp"
+)
+
+const (
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ rrAttempt string = "rr-attempt"
+ rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
+)
+
+// pack job metadata into headers
+func pack(id string, attempt uint64, j *structs.Job) amqp.Table {
+ return amqp.Table{
+ rrID: id,
+ rrJob: j.Job,
+ rrAttempt: attempt,
+ rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused
+ j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}}
+
+ if _, ok := d.Headers[rrID].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ }
+
+ if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ }
+
+ if _, ok := d.Headers[rrJob].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ }
+
+ j.Job = d.Headers[rrJob].(string)
+
+ if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ }
+
+ if _, ok := d.Headers[rrTimeout].(uint64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ }
+
+ if _, ok := d.Headers[rrDelay].(uint64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(uint64)
+ }
+
+ if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ }
+
+ return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 7f1bf204..4751df58 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -16,7 +16,7 @@ func From(d amqp.Delivery) *Item {
}
type Item struct {
- // Job contains name of job broker (usually PHP class).
+ // Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 174cb006..74f9a174 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -8,7 +8,7 @@ import (
)
const (
- name string = "amqp"
+ pluginName string = "amqp"
)
type Plugin struct {
@@ -23,7 +23,7 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
}
func (p *Plugin) Name() string {
- return name
+ return pluginName
}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index 41374878..7e722889 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -1,24 +1,23 @@
package amqp
import (
- "fmt"
-
- "github.com/google/uuid"
+ "github.com/spiral/errors"
"github.com/streadway/amqp"
)
-func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
+func (j *JobsConsumer) initRabbitMQ() error {
+ const op = errors.Op("rabbit_initmq")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
channel, err := j.conn.Channel()
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
err = channel.Qos(j.prefetchCount, 0, false)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// declare an exchange (idempotent operation)
@@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// verify or declare a queue
q, err := channel.QueueDeclare(
- fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()),
+ j.queue,
false,
false,
true,
@@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// bind queue to the exchange
@@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
- }
-
- // start reading messages from the channel
- deliv, err := channel.Consume(
- q.Name,
- "",
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return nil, err
+ return errors.E(op, err)
}
- return deliv, nil
+ return nil
}
func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
index bfb1fbff..874e68c4 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -2,44 +2,70 @@ package amqp
import (
"fmt"
- "time"
"github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
"github.com/streadway/amqp"
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobsConsumer) redialer() {
+func (j *JobsConsumer) redialer() { //nolint:gocognit
go func() {
+ const op = errors.Op("rabbitmq_redial")
for err := range j.conn.NotifyClose(make(chan *amqp.Error)) {
if err != nil {
+ j.Lock()
+
j.logger.Error("connection closed, reconnecting", "error", err)
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
- expb.MaxElapsedTime = time.Minute * j.retryTimeout
+ expb.MaxElapsedTime = j.retryTimeout
op := func() error {
j.logger.Warn("rabbitmq reconnecting, caused by", "error", err)
-
- j.Lock()
var dialErr error
j.conn, dialErr = amqp.Dial(j.connStr)
if dialErr != nil {
- j.Unlock()
return fmt.Errorf("fail to dial server endpoint: %v", dialErr)
}
- j.Unlock()
j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
- deliv, errInit := j.initRabbitMQ()
+ errInit := j.initRabbitMQ()
if errInit != nil {
- j.Unlock()
j.logger.Error("error while redialing", "error", errInit)
return errInit
}
+ // redeclare consume channel
+ var errConnCh error
+ j.consumeChan, errConnCh = j.conn.Channel()
+ if errConnCh != nil {
+ return errors.E(op, errConnCh)
+ }
+
+ // redeclare publish channel
+ var errPubCh error
+ j.publishChan, errPubCh = j.conn.Channel()
+ if errPubCh != nil {
+ return errors.E(op, errPubCh)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// restart listener
j.listener(deliv)
@@ -49,9 +75,12 @@ func (j *JobsConsumer) redialer() {
retryErr := backoff.Retry(op, expb)
if retryErr != nil {
+ j.Unlock()
j.logger.Error("backoff failed", "error", retryErr)
return
}
+
+ j.Unlock()
}
}
}()
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 5cf4c633..030dcae8 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -98,6 +98,11 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
return nil
}
+// Consume is no-op for the ephemeral
+func (j *JobBroker) Consume(_ *pipeline.Pipeline) error {
+ return nil
+}
+
func (j *JobBroker) Pause(pipeline string) {
if q, ok := j.queues.Load(pipeline); ok {
if q == true {