diff options
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 58 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 416 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 187 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/listener.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/plugin.go | 40 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/rabbit_init.go | 65 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 126 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 204 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 112 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/plugin.go | 41 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 103 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 190 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 192 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 66 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/plugin.go | 39 |
15 files changed, 1864 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go new file mode 100644 index 00000000..7befb3c8 --- /dev/null +++ b/plugins/jobs/drivers/amqp/config.go @@ -0,0 +1,58 @@ +package amqp + +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration +type Config struct { + PrefetchCount int `mapstructure:"pipeline_size"` + Queue string `mapstructure:"queue"` + Priority int64 `mapstructure:"priority"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` + Exclusive bool `mapstructure:"exclusive"` +} + +func (c *Config) InitDefault() { + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "default" + } + + if c.PrefetchCount == 0 { + c.PrefetchCount = 100 + } + + if c.Priority == 0 { + c.Priority = 10 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@localhost:5672/" + } +} diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go new file mode 100644 index 00000000..31999e23 --- /dev/null +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -0,0 +1,416 @@ +package amqp + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/streadway/amqp" +) + +type JobsConsumer struct { + sync.Mutex + log logger.Logger + pq priorityqueue.Queue + eh events.Handler + + pipeline atomic.Value + + // amqp connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan *amqp.Channel + + retryTimeout time.Duration + prefetchCount int + priority int64 + exchangeName string + queue string + exclusive bool + consumeID string + connStr string + exchangeType string + routingKey string + + delayCache map[string]struct{} + + stopCh chan struct{} +} + +// NewAMQPConsumer initializes rabbitmq pipeline +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeCfg.RoutingKey + jb.queue = pipeCfg.Queue + jb.exchangeType = pipeCfg.ExchangeType + jb.exchangeName = pipeCfg.Exchange + jb.prefetchCount = pipeCfg.PrefetchCount + jb.exclusive = pipeCfg.Exclusive + jb.priority = pipeCfg.Priority + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // run redialer for the connection + jb.redialer() + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer_from_pipeline") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeline.String(routingKey, "") + jb.queue = pipeline.String(queue, "default") + jb.exchangeType = pipeline.String(exchangeType, "direct") + jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") + jb.prefetchCount = pipeline.Int(prefetch, 10) + jb.priority = int64(pipeline.Int(priority, 10)) + jb.exclusive = pipeline.Bool(exclusive, true) + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // register the pipeline + // error here is always nil + _ = jb.Register(pipeline) + + // run redialer for the connection + jb.redialer() + + return jb, nil +} + +func (j *JobsConsumer) Push(job *job.Job) error { + const op = errors.Op("rabbitmq_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != job.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) + } + + // lock needed here to protect redial concurrent operation + // we may be in the redial state here + j.Lock() + defer j.Unlock() + + // convert + msg := fromJob(job) + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } + + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + } + + _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + + if err != nil { + return errors.E(op, err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + j.delayCache[tmpQ] = struct{}{} + + return nil + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { + j.pipeline.Store(pipeline) + return nil +} + +func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + return nil +} + +func (j *JobsConsumer) Pause(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + err := j.consumeChan.Cancel(j.consumeID, true) + if err != nil { + j.log.Error("cancel publish channel, forcing close", "error", err) + errCl := j.consumeChan.Close() + if errCl != nil { + j.log.Error("force close failed", "error", err) + } + } +} + +func (j *JobsConsumer) Resume(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.log.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.log.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.log.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) +} + +func (j *JobsConsumer) Stop() error { + j.stopCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + return nil +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go new file mode 100644 index 00000000..7c300c88 --- /dev/null +++ b/plugins/jobs/drivers/amqp/item.go @@ -0,0 +1,187 @@ +package amqp + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" + "github.com/streadway/amqp" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` + + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + AckFunc func(multiply bool) error + + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + NackFunc func(multiply bool, requeue bool) error +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int64 `json:"timeout,omitempty"` +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() int64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the amqp, amqp.Table used instead +func (j *Item) Context() ([]byte, error) { + return nil, nil +} + +func (j *Item) Ack() error { + return j.AckFunc(false) +} + +func (j *Item) Nack() error { + return j.NackFunc(false, false) +} + +func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + item, err := j.unpack(d) + if err != nil { + return nil, errors.E(op, err) + } + return &Item{ + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + AckFunc: d.Ack, + NackFunc: d.Nack, + }, nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} + +// pack job metadata into headers +func pack(id string, j *Item) (amqp.Table, error) { + headers, err := json.Marshal(j.Headers) + if err != nil { + return nil, err + } + return amqp.Table{ + job.RRID: id, + job.RRJob: j.Job, + job.RRPipeline: j.Options.Pipeline, + job.RRHeaders: headers, + job.RRTimeout: j.Options.Timeout, + job.RRDelay: j.Options.Delay, + job.RRPriority: j.Options.Priority, + }, nil +} + +// unpack restores jobs.Options +func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + + if _, ok := d.Headers[job.RRID].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) + } + + item.Ident = d.Headers[job.RRID].(string) + + if _, ok := d.Headers[job.RRJob].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob)) + } + + item.Job = d.Headers[job.RRJob].(string) + + if _, ok := d.Headers[job.RRPipeline].(string); ok { + item.Options.Pipeline = d.Headers[job.RRPipeline].(string) + } + + if h, ok := d.Headers[job.RRHeaders].([]byte); ok { + err := json.Unmarshal(h, &item.Headers) + if err != nil { + return nil, err + } + } + + if _, ok := d.Headers[job.RRTimeout].(int64); ok { + item.Options.Timeout = d.Headers[job.RRTimeout].(int64) + } + + if _, ok := d.Headers[job.RRDelay].(int64); ok { + item.Options.Delay = d.Headers[job.RRDelay].(int64) + } + + if _, ok := d.Headers[job.RRPriority]; !ok { + // set pipe's priority + item.Options.Priority = j.priority + } else { + item.Options.Priority = d.Headers[job.RRPriority].(int64) + } + + return item, nil +} diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go new file mode 100644 index 00000000..7241c717 --- /dev/null +++ b/plugins/jobs/drivers/amqp/listener.go @@ -0,0 +1,25 @@ +package amqp + +import "github.com/streadway/amqp" + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := j.fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go new file mode 100644 index 00000000..624f4405 --- /dev/null +++ b/plugins/jobs/drivers/amqp/plugin.go @@ -0,0 +1,40 @@ +package amqp + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "amqp" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, e, pq) +} diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go new file mode 100644 index 00000000..d6b8a708 --- /dev/null +++ b/plugins/jobs/drivers/amqp/rabbit_init.go @@ -0,0 +1,65 @@ +package amqp + +import ( + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" +) + +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("jobs_plugin_rmq_init") + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + j.queue, + false, + false, + j.exclusive, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + j.eh.Push(events.JobEvent{ + Event: events.EventInitialized, + Driver: "amqp", + Start: time.Now(), + }) + return channel.Close() +} diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go new file mode 100644 index 00000000..0b52a4d1 --- /dev/null +++ b/plugins/jobs/drivers/amqp/redial.go @@ -0,0 +1,126 @@ +package amqp + +import ( + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { //nolint:gocognit + go func() { + const op = errors.Op("rabbitmq_redial") + + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + }) + + j.log.Error("connection closed, reconnecting", "error", err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = j.retryTimeout + op := func() error { + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + return errors.E(op, dialErr) + } + + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + errInit := j.initRabbitMQ() + if errInit != nil { + j.log.Error("rabbitmq dial", "error", errInit) + return errInit + } + + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + var errPubCh error + j.publishChan, errPubCh = j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // restart listener + j.listener(deliv) + + j.log.Info("queues and subscribers redeclared successfully") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.Unlock() + j.log.Error("backoff failed", "error", retryErr) + return + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + + j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } + } + } + }() +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go new file mode 100644 index 00000000..45ee8083 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -0,0 +1,204 @@ +package ephemeral + +import ( + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pipelineSize string = "pipeline_size" +) + +type Config struct { + PipelineSize uint64 `mapstructure:"pipeline_size"` +} + +type JobBroker struct { + cfg *Config + log logger.Logger + eh events.Handler + pipeline sync.Map + pq priorityqueue.Queue + localQueue chan *Item + + stopCh chan struct{} +} + +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { + const op = errors.Op("new_ephemeral_pipeline") + + jb := &JobBroker{ + log: log, + pq: pq, + eh: eh, + stopCh: make(chan struct{}, 1), + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.PipelineSize == 0 { + jb.cfg.PipelineSize = 100_000 + } + + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { + jb := &JobBroker{ + log: log, + pq: pq, + eh: eh, + stopCh: make(chan struct{}, 1), + } + + jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000)) + + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + + return jb, nil +} + +func (j *JobBroker) Push(jb *job.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok { + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) + } + + msg := fromJob(jb) + // handle timeouts + if msg.Options.Timeout > 0 { + go func(jj *job.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // send the item after timeout expired + j.localQueue <- msg + }(jb) + + return nil + } + + // insert to the local, limited pipeline + j.localQueue <- msg + + return nil + } + + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) +} + +func (j *JobBroker) consume() { + // redirect + for { + select { + case item := <-j.localQueue: + j.pq.Insert(item) + case <-j.stopCh: + return + } + } +} + +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { + const op = errors.Op("ephemeral_register") + if _, ok := j.pipeline.Load(pipeline.Name()); ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) + } + + j.pipeline.Store(pipeline.Name(), true) + + return nil +} + +func (j *JobBroker) Pause(pipeline string) { + if q, ok := j.pipeline.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.pipeline.Store(pipeline, false) + } + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.pipeline.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.pipeline.Store(pipeline, true) + } + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) +} + +func (j *JobBroker) List() []string { + out := make([]string, 0, 2) + + j.pipeline.Range(func(key, value interface{}) bool { + pipe := key.(string) + out = append(out, pipe) + return true + }) + + return out +} + +// Run is no-op for the ephemeral +func (j *JobBroker) Run(_ *pipeline.Pipeline) error { + return nil +} + +func (j *JobBroker) Stop() error { + var pipe string + j.pipeline.Range(func(key, _ interface{}) bool { + pipe = key.(string) + j.pipeline.Delete(key) + return true + }) + + // return from the consumer + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + + return nil +} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go new file mode 100644 index 00000000..442533c5 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -0,0 +1,112 @@ +package ephemeral + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int64 `json:"timeout,omitempty"` +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() int64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (j *Item) Ack() error { + // noop for the in-memory + return nil +} + +func (j *Item) Nack() error { + // noop for the in-memory + return nil +} diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go new file mode 100644 index 00000000..28495abb --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/plugin.go @@ -0,0 +1,41 @@ +package ephemeral + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "ephemeral" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipeline, p.log, e, pq) +} diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go new file mode 100644 index 00000000..0b4e8157 --- /dev/null +++ b/plugins/jobs/drivers/sqs/config.go @@ -0,0 +1,103 @@ +package sqs + +type GlobalCfg struct { + Key string `mapstructure:"key"` + Secret string `mapstructure:"secret"` + Region string `mapstructure:"region"` + SessionToken string `mapstructure:"session_token"` + Endpoint string `mapstructure:"endpoint"` +} + +// Config is used to parse pipeline configuration +type Config struct { + // The duration (in seconds) that the received messages are hidden from subsequent + // retrieve requests after being retrieved by a ReceiveMessage request. + VisibilityTimeout int32 `mapstructure:"visibility_timeout"` + // The duration (in seconds) for which the call waits for a message to arrive + // in the queue before returning. If a message is available, the call returns + // sooner than WaitTimeSeconds. If no messages are available and the wait time + // expires, the call returns successfully with an empty list of messages. + WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` + // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages + // than this value (however, fewer messages might be returned). Valid values: 1 to + // 10. Default: 1. + PrefetchCount int32 `mapstructure:"pipeline_size"` + // The name of the new queue. The following limits apply to this name: + // + // * A queue + // name can have up to 80 characters. + // + // * Valid values: alphanumeric characters, + // hyphens (-), and underscores (_). + // + // * A FIFO queue name must end with the .fifo + // suffix. + // + // Queue URLs and names are case-sensitive. + // + // This member is required. + Queue string `mapstructure:"queue"` + + // A map of attributes with their corresponding values. The following lists the + // names, descriptions, and values of the special request parameters that the + // CreateQueue action uses. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html + Attributes map[string]string `mapstructure:"attributes"` + + // From amazon docs: + // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see + // Tagging Your Amazon SQS Queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html) + // in the Amazon SQS Developer Guide. When you use queue tags, keep the following + // guidelines in mind: + // + // * Adding more than 50 tags to a queue isn't recommended. + // + // * + // Tags don't have any semantic meaning. Amazon SQS interprets tags as character + // strings. + // + // * Tags are case-sensitive. + // + // * A new tag with a key identical to that + // of an existing tag overwrites the existing tag. + // + // For a full list of tag + // restrictions, see Quotas related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues) + // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you + // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account + // permissions don't apply to this action. For more information, see Grant + // cross-account permissions to a role and a user name + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name) + // in the Amazon SQS Developer Guide. + Tags map[string]string `mapstructure:"tags"` +} + +func (c *GlobalCfg) InitDefault() { + if c.Endpoint == "" { + c.Endpoint = "http://localhost:9324" + } +} + +func (c *Config) InitDefault() { + if c.Queue == "" { + c.Queue = "default" + } + + if c.PrefetchCount == 0 || c.PrefetchCount > 10 { + c.PrefetchCount = 10 + } + + if c.WaitTimeSeconds == 0 { + c.WaitTimeSeconds = 5 + } + + if c.Attributes == nil { + c.Attributes = make(map[string]string) + } + + if c.Tags == nil { + c.Tags = make(map[string]string) + } +} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go new file mode 100644 index 00000000..c0f66589 --- /dev/null +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -0,0 +1,190 @@ +package sqs + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/google/uuid" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type JobConsumer struct { + sync.Mutex + pq priorityqueue.Queue + log logger.Logger + eh events.Handler + pipeline atomic.Value + + // connection info + key string + secret string + sessionToken string + region string + endpoint string + queue string + messageGroupID string + waitTime int32 + prefetch int32 + visibilityTimeout int32 + + // queue optional parameters + attributes map[string]string + tags map[string]string + + client *sqs.Client + outputQ *sqs.CreateQueueOutput + + pauseCh chan struct{} +} + +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + const op = errors.Op("new_sqs_consumer") + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) + } + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // initialize job consumer + jb := &JobConsumer{ + pq: pq, + log: log, + eh: e, + messageGroupID: uuid.NewString(), + attributes: pipeCfg.Attributes, + tags: pipeCfg.Tags, + queue: pipeCfg.Queue, + prefetch: pipeCfg.PrefetchCount, + visibilityTimeout: pipeCfg.VisibilityTimeout, + waitTime: pipeCfg.WaitTimeSeconds, + region: globalCfg.Region, + key: globalCfg.Key, + sessionToken: globalCfg.SessionToken, + secret: globalCfg.Secret, + endpoint: globalCfg.Endpoint, + } + + // PARSE CONFIGURATION ------- + + awsConf, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(globalCfg.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) + if err != nil { + return nil, errors.E(op, err) + } + + // config with retries + jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + }) + }) + + jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + if err != nil { + return nil, errors.E(op, err) + } + + return jb, nil +} + +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + return &JobConsumer{}, nil +} + +func (j *JobConsumer) Push(jb *job.Job) error { + const op = errors.Op("sqs_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != jb.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) + } + + // The length of time, in seconds, for which to delay a specific message. Valid + // values: 0 to 900. Maximum: 15 minutes. + if jb.Options.Delay > 900 { + return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) + } + + msg := fromJob(jb) + + // The new value for the message's visibility timeout (in seconds). Values range: 0 + // to 43200. Maximum: 12 hours. + _, err := j.client.SendMessage(context.Background(), j.pack(msg)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { + j.pipeline.Store(pipeline) + return nil +} + +func (j *JobConsumer) Run(p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + j.Lock() + defer j.Unlock() + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // start listener + go j.listen() + + return nil +} + +func (j *JobConsumer) Stop() error { + panic("implement me") +} + +func (j *JobConsumer) Pause(pipeline string) { + panic("implement me") +} + +func (j *JobConsumer) Resume(pipeline string) { + panic("implement me") +} diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go new file mode 100644 index 00000000..ef736be9 --- /dev/null +++ b/plugins/jobs/drivers/sqs/item.go @@ -0,0 +1,192 @@ +package sqs + +import ( + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + StringType string = "String" + NumberType string = "Number" + ApproximateReceiveCount string = "ApproximateReceiveCount" +) + +var attributes = []string{ + job.RRJob, + job.RRDelay, + job.RRTimeout, + job.RRPriority, + job.RRMaxAttempts, +} + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int64 `json:"timeout,omitempty"` + + // Maximum number of attempts to receive and process the message + MaxAttempts int64 `json:"max_attempts,omitempty"` +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int64) bool { + // Attempts 1 and 0 has identical effect + return o.MaxAttempts > (attempt + 1) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() int64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the sqs, MessageAttributes used instead +func (j *Item) Context() ([]byte, error) { + return nil, nil +} + +func (j *Item) Ack() error { + return nil +} + +func (j *Item) Nack() error { + return nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + MaxAttempts: job.Options.Attempts, + }, + } +} + +func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput { + return &sqs.SendMessageInput{ + MessageBody: aws.String(item.Payload), + QueueUrl: j.outputQ.QueueUrl, + DelaySeconds: int32(item.Options.Delay), + MessageAttributes: map[string]types.MessageAttributeValue{ + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))}, + job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))}, + }, + } +} + +func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) { + const op = errors.Op("sqs_unpack") + // reserved + if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { + return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) + } + + for i := 0; i < len(attributes); i++ { + if _, ok := msg.MessageAttributes[attributes[i]]; !ok { + return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i])) + } + } + + attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue) + if err != nil { + return nil, 0, errors.E(op, err) + } + + delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) + if err != nil { + return nil, 0, errors.E(op, err) + } + + to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue) + if err != nil { + return nil, 0, errors.E(op, err) + } + + priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) + if err != nil { + return nil, 0, errors.E(op, err) + } + + recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount]) + if err != nil { + return nil, 0, errors.E(op, err) + } + + item := &Item{ + Job: *msg.MessageAttributes[job.RRJob].StringValue, + Payload: *msg.Body, + Options: &Options{ + Delay: int64(delay), + Timeout: int64(to), + Priority: int64(priority), + MaxAttempts: int64(attempt), + }, + } + + return item, recCount, nil +} diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go new file mode 100644 index 00000000..a10ce5a6 --- /dev/null +++ b/plugins/jobs/drivers/sqs/listener.go @@ -0,0 +1,66 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +const ( + All string = "All" +) + +func (j *JobConsumer) listen() { + for { + select { + case <-j.pauseCh: + return + default: + message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ + QueueUrl: j.outputQ.QueueUrl, + MaxNumberOfMessages: j.prefetch, + AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, + MessageAttributeNames: []string{All}, + VisibilityTimeout: j.visibilityTimeout, + WaitTimeSeconds: j.waitTime, + }) + if err != nil { + j.log.Error("receive message", "error", err) + continue + } + + for i := 0; i < len(message.Messages); i++ { + m := message.Messages[i] + item, attempt, err := j.unpack(&m) + if err != nil { + _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: j.outputQ.QueueUrl, + ReceiptHandle: m.ReceiptHandle, + }) + if errD != nil { + j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + continue + } + + j.log.Error("message unpack", "error", err) + continue + } + + if item.Options.CanRetry(int64(attempt)) { + j.pq.Insert(item) + continue + } + + _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: j.outputQ.QueueUrl, + ReceiptHandle: m.ReceiptHandle, + }) + if errD != nil { + j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + continue + } + } + } + } +} diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go new file mode 100644 index 00000000..54f61ff5 --- /dev/null +++ b/plugins/jobs/drivers/sqs/plugin.go @@ -0,0 +1,39 @@ +package sqs + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "sqs" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewSQSConsumer(configKey, p.log, p.cfg, e, pq) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, e, pq) +} |