diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 282 |
1 files changed, 258 insertions, 24 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index f91b71e7..9ac47269 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1,9 +1,11 @@ package amqp import ( + "fmt" "sync" "time" + "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" @@ -14,9 +16,32 @@ import ( "github.com/streadway/amqp" ) +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration type Config struct { - Addr string - Queue string + PrefetchCount int `mapstructure:"pipeline_size"` + Queue string `mapstructure:"queue"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` } type JobsConsumer struct { @@ -27,35 +52,91 @@ type JobsConsumer struct { pipelines sync.Map // amqp connection - conn *amqp.Connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan *amqp.Channel + retryTimeout time.Duration prefetchCount int exchangeName string + queue string + consumeID string connStr string exchangeType string routingKey string + // TODO send data to channel stop chan struct{} } func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. - // firs part - address to connect, it is located in the global section under the amqp name + // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - logger: log, - pq: pq, + logger: log, + pq: pq, + consumeID: uuid.NewString(), + stop: make(chan struct{}), + retryTimeout: time.Minute * 5, + } + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - d, err := jb.initRabbitMQ() + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(configKey, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeCfg.RoutingKey + jb.queue = pipeCfg.Queue + jb.exchangeType = pipeCfg.ExchangeType + jb.exchangeName = pipeCfg.Exchange + jb.prefetchCount = pipeCfg.PrefetchCount + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // assign address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() if err != nil { return nil, err } - // run listener - jb.listener(d) + jb.publishChan, err = jb.conn.Channel() + if err != nil { + panic(err) + } - // run redialer + // run redialer for the connection jb.redialer() return jb, nil @@ -63,27 +144,57 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") + j.RLock() + defer j.RUnlock() // check if the pipeline registered - if b, ok := j.pipelines.Load(job.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) - } - + if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { // handle timeouts - if job.Options.Timeout > 0 { - go func(jj *structs.Job) { - time.Sleep(jj.Options.TimeoutDuration()) + if job.Options.DelayDuration() > 0 { + // pub + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - // TODO push + _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: 100, + dlxExpires: 200, + }) - // send the item after timeout expired - }(job) + if err != nil { + panic(err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + panic(err) + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } // insert to the local, limited pipeline + err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + //Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } @@ -92,7 +203,52 @@ func (j *JobsConsumer) Push(job *structs.Job) error { } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { - panic("implement me") + const op = errors.Op("rabbitmq_register") + if _, ok := j.pipelines.Load(pipeline.Name()); ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) + } + + j.pipelines.Store(pipeline.Name(), true) + + return nil +} + +func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + if _, ok := j.pipelines.Load(pipeline.Name()); !ok { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) + } + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + return nil } func (j *JobsConsumer) List() []*pipeline.Pipeline { @@ -100,9 +256,87 @@ func (j *JobsConsumer) List() []*pipeline.Pipeline { } func (j *JobsConsumer) Pause(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.pipelines.Store(pipeline, false) + } + } + + err := j.publishChan.Cancel(j.consumeID, true) + if err != nil { + j.logger.Error("cancel publish channel, forcing close", "error", err) + errCl := j.publishChan.Close() + if errCl != nil { + j.logger.Error("force close failed", "error", err) + } + } } func (j *JobsConsumer) Resume(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.pipelines.Store(pipeline, true) + } + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.logger.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.logger.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.logger.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) + } +} + +// Declare used to dynamically declare a pipeline +func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error { + pipeline.String(exchangeKey, "") + pipeline.String(queue, "") + pipeline.String(routingKey, "") + pipeline.String(exchangeType, "direct") + return nil +} + +func (c *Config) InitDefault() { + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "default" + } + + if c.PrefetchCount == 0 { + c.PrefetchCount = 100 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@localhost:5672/" + } } |