diff options
Diffstat (limited to 'plugins/jobs/brokers')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 282 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/headers.go | 68 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 35 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 47 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 5 |
7 files changed, 382 insertions, 61 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index f91b71e7..9ac47269 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1,9 +1,11 @@ package amqp import ( + "fmt" "sync" "time" + "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" @@ -14,9 +16,32 @@ import ( "github.com/streadway/amqp" ) +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration type Config struct { - Addr string - Queue string + PrefetchCount int `mapstructure:"pipeline_size"` + Queue string `mapstructure:"queue"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` } type JobsConsumer struct { @@ -27,35 +52,91 @@ type JobsConsumer struct { pipelines sync.Map // amqp connection - conn *amqp.Connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan *amqp.Channel + retryTimeout time.Duration prefetchCount int exchangeName string + queue string + consumeID string connStr string exchangeType string routingKey string + // TODO send data to channel stop chan struct{} } func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. - // firs part - address to connect, it is located in the global section under the amqp name + // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - logger: log, - pq: pq, + logger: log, + pq: pq, + consumeID: uuid.NewString(), + stop: make(chan struct{}), + retryTimeout: time.Minute * 5, + } + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - d, err := jb.initRabbitMQ() + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(configKey, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeCfg.RoutingKey + jb.queue = pipeCfg.Queue + jb.exchangeType = pipeCfg.ExchangeType + jb.exchangeName = pipeCfg.Exchange + jb.prefetchCount = pipeCfg.PrefetchCount + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // assign address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() if err != nil { return nil, err } - // run listener - jb.listener(d) + jb.publishChan, err = jb.conn.Channel() + if err != nil { + panic(err) + } - // run redialer + // run redialer for the connection jb.redialer() return jb, nil @@ -63,27 +144,57 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") + j.RLock() + defer j.RUnlock() // check if the pipeline registered - if b, ok := j.pipelines.Load(job.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) - } - + if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { // handle timeouts - if job.Options.Timeout > 0 { - go func(jj *structs.Job) { - time.Sleep(jj.Options.TimeoutDuration()) + if job.Options.DelayDuration() > 0 { + // pub + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - // TODO push + _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: 100, + dlxExpires: 200, + }) - // send the item after timeout expired - }(job) + if err != nil { + panic(err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + panic(err) + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } // insert to the local, limited pipeline + err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + //Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } @@ -92,7 +203,52 @@ func (j *JobsConsumer) Push(job *structs.Job) error { } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { - panic("implement me") + const op = errors.Op("rabbitmq_register") + if _, ok := j.pipelines.Load(pipeline.Name()); ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) + } + + j.pipelines.Store(pipeline.Name(), true) + + return nil +} + +func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + if _, ok := j.pipelines.Load(pipeline.Name()); !ok { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) + } + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + return nil } func (j *JobsConsumer) List() []*pipeline.Pipeline { @@ -100,9 +256,87 @@ func (j *JobsConsumer) List() []*pipeline.Pipeline { } func (j *JobsConsumer) Pause(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.pipelines.Store(pipeline, false) + } + } + + err := j.publishChan.Cancel(j.consumeID, true) + if err != nil { + j.logger.Error("cancel publish channel, forcing close", "error", err) + errCl := j.publishChan.Close() + if errCl != nil { + j.logger.Error("force close failed", "error", err) + } + } } func (j *JobsConsumer) Resume(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.pipelines.Store(pipeline, true) + } + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.logger.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.logger.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.logger.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) + } +} + +// Declare used to dynamically declare a pipeline +func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error { + pipeline.String(exchangeKey, "") + pipeline.String(queue, "") + pipeline.String(routingKey, "") + pipeline.String(exchangeType, "direct") + return nil +} + +func (c *Config) InitDefault() { + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "default" + } + + if c.PrefetchCount == 0 { + c.PrefetchCount = 100 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@localhost:5672/" + } } diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go new file mode 100644 index 00000000..b1f9c89d --- /dev/null +++ b/plugins/jobs/brokers/amqp/headers.go @@ -0,0 +1,68 @@ +package amqp + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/streadway/amqp" +) + +const ( + rrID string = "rr-id" + rrJob string = "rr-job" + rrAttempt string = "rr-attempt" + rrMaxAttempts string = "rr-max_attempts" + rrTimeout string = "rr-timeout" + rrDelay string = "rr-delay" + rrRetryDelay string = "rr-retry_delay" +) + +// pack job metadata into headers +func pack(id string, attempt uint64, j *structs.Job) amqp.Table { + return amqp.Table{ + rrID: id, + rrJob: j.Job, + rrAttempt: attempt, + rrMaxAttempts: j.Options.Attempts, + rrTimeout: j.Options.Timeout, + rrDelay: j.Options.Delay, + rrRetryDelay: j.Options.RetryDelay, + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused + j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}} + + if _, ok := d.Headers[rrID].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrID) + } + + if _, ok := d.Headers[rrAttempt].(uint64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) + } + + if _, ok := d.Headers[rrJob].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob) + } + + j.Job = d.Headers[rrJob].(string) + + if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { + j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) + } + + if _, ok := d.Headers[rrTimeout].(uint64); ok { + j.Options.Timeout = d.Headers[rrTimeout].(uint64) + } + + if _, ok := d.Headers[rrDelay].(uint64); ok { + j.Options.Delay = d.Headers[rrDelay].(uint64) + } + + if _, ok := d.Headers[rrRetryDelay].(uint64); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64) + } + + return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil +} diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 7f1bf204..4751df58 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -16,7 +16,7 @@ func From(d amqp.Delivery) *Item { } type Item struct { - // Job contains name of job broker (usually PHP class). + // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` // Ident is unique identifier of the job, should be provided from outside diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 174cb006..74f9a174 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -8,7 +8,7 @@ import ( ) const ( - name string = "amqp" + pluginName string = "amqp" ) type Plugin struct { @@ -23,7 +23,7 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { } func (p *Plugin) Name() string { - return name + return pluginName } func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index 41374878..7e722889 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -1,24 +1,23 @@ package amqp import ( - "fmt" - - "github.com/google/uuid" + "github.com/spiral/errors" "github.com/streadway/amqp" ) -func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("rabbit_initmq") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. channel, err := j.conn.Channel() if err != nil { - return nil, err + return errors.E(op, err) } err = channel.Qos(j.prefetchCount, 0, false) if err != nil { - return nil, err + return errors.E(op, err) } // declare an exchange (idempotent operation) @@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // verify or declare a queue q, err := channel.QueueDeclare( - fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()), + j.queue, false, false, true, @@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // bind queue to the exchange @@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err - } - - // start reading messages from the channel - deliv, err := channel.Consume( - q.Name, - "", - false, - false, - false, - false, - nil, - ) - if err != nil { - return nil, err + return errors.E(op, err) } - return deliv, nil + return nil } func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index bfb1fbff..874e68c4 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -2,44 +2,70 @@ package amqp import ( "fmt" - "time" "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" "github.com/streadway/amqp" ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobsConsumer) redialer() { +func (j *JobsConsumer) redialer() { //nolint:gocognit go func() { + const op = errors.Op("rabbitmq_redial") for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { if err != nil { + j.Lock() + j.logger.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = time.Minute * j.retryTimeout + expb.MaxElapsedTime = j.retryTimeout op := func() error { j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) - - j.Lock() var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) if dialErr != nil { - j.Unlock() return fmt.Errorf("fail to dial server endpoint: %v", dialErr) } - j.Unlock() j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - deliv, errInit := j.initRabbitMQ() + errInit := j.initRabbitMQ() if errInit != nil { - j.Unlock() j.logger.Error("error while redialing", "error", errInit) return errInit } + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + var errPubCh error + j.publishChan, errPubCh = j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + // restart listener j.listener(deliv) @@ -49,9 +75,12 @@ func (j *JobsConsumer) redialer() { retryErr := backoff.Retry(op, expb) if retryErr != nil { + j.Unlock() j.logger.Error("backoff failed", "error", retryErr) return } + + j.Unlock() } } }() diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 5cf4c633..030dcae8 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -98,6 +98,11 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } +// Consume is no-op for the ephemeral +func (j *JobBroker) Consume(_ *pipeline.Pipeline) error { + return nil +} + func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { |