summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/drivers/amqp/config.go67
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go508
-rw-r--r--plugins/jobs/drivers/amqp/item.go239
-rw-r--r--plugins/jobs/drivers/amqp/listener.go25
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go40
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go57
-rw-r--r--plugins/jobs/drivers/amqp/redial.go141
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go53
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go223
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go360
-rw-r--r--plugins/jobs/drivers/beanstalk/encode_test.go75
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go147
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go39
-rw-r--r--plugins/jobs/drivers/beanstalk/plugin.go47
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go274
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go133
-rw-r--r--plugins/jobs/drivers/ephemeral/plugin.go41
-rw-r--r--plugins/jobs/drivers/sqs/config.go114
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go411
-rw-r--r--plugins/jobs/drivers/sqs/item.go247
-rw-r--r--plugins/jobs/drivers/sqs/listener.go87
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
-rw-r--r--plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go)33
-rw-r--r--plugins/jobs/job/job_options.go32
-rw-r--r--plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go)0
-rw-r--r--plugins/jobs/plugin.go18
26 files changed, 50 insertions, 3400 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
deleted file mode 100644
index 1ec089f1..00000000
--- a/plugins/jobs/drivers/amqp/config.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package amqp
-
-// pipeline rabbitmq info
-const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange_type"
- queue string = "queue"
- routingKey string = "routing_key"
- prefetch string = "prefetch"
- exclusive string = "exclusive"
- priority string = "priority"
- multipleAsk string = "multiple_ask"
- requeueOnFail string = "requeue_on_fail"
-
- dlx string = "x-dead-letter-exchange"
- dlxRoutingKey string = "x-dead-letter-routing-key"
- dlxTTL string = "x-message-ttl"
- dlxExpires string = "x-expires"
-
- contentType string = "application/octet-stream"
-)
-
-type GlobalCfg struct {
- Addr string `mapstructure:"addr"`
-}
-
-// Config is used to parse pipeline configuration
-type Config struct {
- Prefetch int `mapstructure:"prefetch"`
- Queue string `mapstructure:"queue"`
- Priority int64 `mapstructure:"priority"`
- Exchange string `mapstructure:"exchange"`
- ExchangeType string `mapstructure:"exchange_type"`
- RoutingKey string `mapstructure:"routing_key"`
- Exclusive bool `mapstructure:"exclusive"`
- MultipleAck bool `mapstructure:"multiple_ask"`
- RequeueOnFail bool `mapstructure:"requeue_on_fail"`
-}
-
-func (c *Config) InitDefault() {
- // all options should be in sync with the pipeline defaults in the FromPipeline method
- if c.ExchangeType == "" {
- c.ExchangeType = "direct"
- }
-
- if c.Exchange == "" {
- c.Exchange = "amqp.default"
- }
-
- if c.Queue == "" {
- c.Queue = "default"
- }
-
- if c.Prefetch == 0 {
- c.Prefetch = 10
- }
-
- if c.Priority == 0 {
- c.Priority = 10
- }
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Addr == "" {
- c.Addr = "amqp://guest:[email protected]:5672/"
- }
-}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
deleted file mode 100644
index 95df02ec..00000000
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ /dev/null
@@ -1,508 +0,0 @@
-package amqp
-
-import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/google/uuid"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type JobConsumer struct {
- sync.Mutex
- log logger.Logger
- pq priorityqueue.Queue
- eh events.Handler
-
- pipeline atomic.Value
-
- // amqp connection
- conn *amqp.Connection
- consumeChan *amqp.Channel
- publishChan chan *amqp.Channel
- consumeID string
- connStr string
-
- retryTimeout time.Duration
- //
- // prefetch QoS AMQP
- //
- prefetch int
- //
- // pipeline's priority
- //
- priority int64
- exchangeName string
- queue string
- exclusive bool
- exchangeType string
- routingKey string
- multipleAck bool
- requeueOnFail bool
-
- listeners uint32
- delayed *int64
- stopCh chan struct{}
-}
-
-// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_amqp_consumer")
- // we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp pluginName
- // second part - queues and other pipeline information
- // if no such key - error
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
- }
-
- // PARSE CONFIGURATION START -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
- // PARSE CONFIGURATION END -------
-
- jb := &JobConsumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- priority: pipeCfg.Priority,
- delayed: utils.Int64(0),
-
- publishChan: make(chan *amqp.Channel, 1),
- routingKey: pipeCfg.RoutingKey,
- queue: pipeCfg.Queue,
- exchangeType: pipeCfg.ExchangeType,
- exchangeName: pipeCfg.Exchange,
- prefetch: pipeCfg.Prefetch,
- exclusive: pipeCfg.Exclusive,
- multipleAck: pipeCfg.MultipleAck,
- requeueOnFail: pipeCfg.RequeueOnFail,
- }
-
- jb.conn, err = amqp.Dial(globalCfg.Addr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save address
- jb.connStr = globalCfg.Addr
-
- err = jb.initRabbitMQ()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pch, err := jb.conn.Channel()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- jb.publishChan <- pch
-
- // run redialer and requeue listener for the connection
- jb.redialer()
-
- return jb, nil
-}
-
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_amqp_consumer_from_pipeline")
- // we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp pluginName
- // second part - queues and other pipeline information
-
- // only global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
- }
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- jb := &JobConsumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayed: utils.Int64(0),
-
- publishChan: make(chan *amqp.Channel, 1),
- routingKey: pipeline.String(routingKey, ""),
- queue: pipeline.String(queue, "default"),
- exchangeType: pipeline.String(exchangeType, "direct"),
- exchangeName: pipeline.String(exchangeKey, "amqp.default"),
- prefetch: pipeline.Int(prefetch, 10),
- priority: int64(pipeline.Int(priority, 10)),
- exclusive: pipeline.Bool(exclusive, false),
- multipleAck: pipeline.Bool(multipleAsk, false),
- requeueOnFail: pipeline.Bool(requeueOnFail, false),
- }
-
- jb.conn, err = amqp.Dial(globalCfg.Addr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save address
- jb.connStr = globalCfg.Addr
-
- err = jb.initRabbitMQ()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pch, err := jb.conn.Channel()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- jb.publishChan <- pch
-
- // register the pipeline
- // error here is always nil
- _ = jb.Register(context.Background(), pipeline)
-
- // run redialer for the connection
- jb.redialer()
-
- return jb, nil
-}
-
-func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
- const op = errors.Op("rabbitmq_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != job.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
- }
-
- err := j.handleItem(ctx, fromJob(job))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
- return nil
-}
-
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
- }
-
- // protect connection (redial)
- j.Lock()
- defer j.Unlock()
-
- var err error
- j.consumeChan, err = j.conn.Channel()
- if err != nil {
- return errors.E(op, err)
- }
-
- err = j.consumeChan.Qos(j.prefetch, 0, false)
- if err != nil {
- return errors.E(op, err)
- }
-
- // start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // run listener
- j.listener(deliv)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-
- return nil
-}
-
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("amqp_driver_state")
- select {
- case pch := <-j.publishChan:
- defer func() {
- j.publishChan <- pch
- }()
-
- q, err := pch.QueueInspect(j.queue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- return &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: q.Name,
- Active: int64(q.Messages),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }, nil
-
- case <-ctx.Done():
- return nil, errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- // protect connection (redial)
- j.Lock()
- defer j.Unlock()
-
- err := j.consumeChan.Cancel(j.consumeID, true)
- if err != nil {
- j.log.Error("cancel publish channel, forcing close", "error", err)
- errCl := j.consumeChan.Close()
- if errCl != nil {
- j.log.Error("force close failed", "error", err)
- return
- }
- return
- }
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
- }
-
- // protect connection (redial)
- j.Lock()
- defer j.Unlock()
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 1 {
- j.log.Warn("amqp listener already in the active state")
- return
- }
-
- var err error
- j.consumeChan, err = j.conn.Channel()
- if err != nil {
- j.log.Error("create channel on rabbitmq connection", "error", err)
- return
- }
-
- err = j.consumeChan.Qos(j.prefetch, 0, false)
- if err != nil {
- j.log.Error("qos set failed", "error", err)
- return
- }
-
- // start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- j.log.Error("consume operation failed", "error", err)
- return
- }
-
- // run listener
- j.listener(deliv)
-
- // increase number of listeners
- atomic.AddUint32(&j.listeners, 1)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *JobConsumer) Stop(context.Context) error {
- j.stopCh <- struct{}{}
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("rabbitmq_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- atomic.AddInt64(j.delayed, 1)
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
deleted file mode 100644
index 623dcca7..00000000
--- a/plugins/jobs/drivers/amqp/item.go
+++ /dev/null
@@ -1,239 +0,0 @@
-package amqp
-
-import (
- "context"
- "fmt"
- "sync/atomic"
- "time"
-
- json "github.com/json-iterator/go"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type Item struct {
- // Job contains pluginName of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // private
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
- ack func(multiply bool) error
-
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
- // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
- // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
- // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
- nack func(multiply bool, requeue bool) error
-
- // requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
- delayed *int64
- multipleAsk bool
- requeue bool
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-// Not used in the amqp, amqp.Table used instead
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- if i.Options.Delay > 0 {
- atomic.AddInt64(i.Options.delayed, ^int64(0))
- }
- return i.Options.ack(i.Options.multipleAsk)
-}
-
-func (i *Item) Nack() error {
- if i.Options.Delay > 0 {
- atomic.AddInt64(i.Options.delayed, ^int64(0))
- }
- return i.Options.nack(false, i.Options.requeue)
-}
-
-// Requeue with the provided delay, handled by the Nack
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- if i.Options.Delay > 0 {
- atomic.AddInt64(i.Options.delayed, ^int64(0))
- }
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- errNack := i.Options.nack(false, true)
- if errNack != nil {
- return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack)
- }
-
- return err
- }
-
- // ack the job
- err = i.Options.ack(false)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
- const op = errors.Op("from_delivery_convert")
- item, err := j.unpack(d)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- i := &Item{
- Job: item.Job,
- Ident: item.Ident,
- Payload: item.Payload,
- Headers: item.Headers,
- Options: item.Options,
- }
-
- item.Options.ack = d.Ack
- item.Options.nack = d.Nack
- item.Options.delayed = j.delayed
-
- // requeue func
- item.Options.requeueFn = j.handleItem
- return i, nil
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Headers: job.Headers,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
-
-// pack job metadata into headers
-func pack(id string, j *Item) (amqp.Table, error) {
- headers, err := json.Marshal(j.Headers)
- if err != nil {
- return nil, err
- }
- return amqp.Table{
- job.RRID: id,
- job.RRJob: j.Job,
- job.RRPipeline: j.Options.Pipeline,
- job.RRHeaders: headers,
- job.RRDelay: j.Options.Delay,
- job.RRPriority: j.Options.Priority,
- }, nil
-}
-
-// unpack restores jobs.Options
-func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
- item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
- multipleAsk: j.multipleAck,
- requeue: j.requeueOnFail,
- requeueFn: j.handleItem,
- }}
-
- if _, ok := d.Headers[job.RRID].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
- }
-
- item.Ident = d.Headers[job.RRID].(string)
-
- if _, ok := d.Headers[job.RRJob].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
- }
-
- item.Job = d.Headers[job.RRJob].(string)
-
- if _, ok := d.Headers[job.RRPipeline].(string); ok {
- item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
- }
-
- if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
- err := json.Unmarshal(h, &item.Headers)
- if err != nil {
- return nil, err
- }
- }
-
- if _, ok := d.Headers[job.RRDelay].(int64); ok {
- item.Options.Delay = d.Headers[job.RRDelay].(int64)
- }
-
- if _, ok := d.Headers[job.RRPriority]; !ok {
- // set pipe's priority
- item.Options.Priority = j.priority
- } else {
- item.Options.Priority = d.Headers[job.RRPriority].(int64)
- }
-
- return item, nil
-}
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
deleted file mode 100644
index 0b1cd2dc..00000000
--- a/plugins/jobs/drivers/amqp/listener.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package amqp
-
-import amqp "github.com/rabbitmq/amqp091-go"
-
-func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
- go func() {
- for { //nolint:gosimple
- select {
- case msg, ok := <-deliv:
- if !ok {
- j.log.Info("delivery channel closed, leaving the rabbit listener")
- return
- }
-
- d, err := j.fromDelivery(msg)
- if err != nil {
- j.log.Error("amqp delivery convert", "error", err)
- continue
- }
- // insert job into the main priority queue
- j.pq.Insert(d)
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
deleted file mode 100644
index 624f4405..00000000
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- pluginName string = "amqp"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Name() string {
- return pluginName
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
-}
-
-// FromPipeline constructs AMQP driver from pipeline
-func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
-}
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
deleted file mode 100644
index 56ef10c8..00000000
--- a/plugins/jobs/drivers/amqp/rabbit_init.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/errors"
-)
-
-func (j *JobConsumer) initRabbitMQ() error {
- const op = errors.Op("jobs_plugin_rmq_init")
- // Channel opens a unique, concurrent server channel to process the bulk of AMQP
- // messages. Any error from methods on this receiver will render the receiver
- // invalid and a new Channel should be opened.
- channel, err := j.conn.Channel()
- if err != nil {
- return errors.E(op, err)
- }
-
- // declare an exchange (idempotent operation)
- err = channel.ExchangeDeclare(
- j.exchangeName,
- j.exchangeType,
- true,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // verify or declare a queue
- q, err := channel.QueueDeclare(
- j.queue,
- false,
- false,
- j.exclusive,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // bind queue to the exchange
- err = channel.QueueBind(
- q.Name,
- j.routingKey,
- j.exchangeName,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- return channel.Close()
-}
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
deleted file mode 100644
index 8dc18b8f..00000000
--- a/plugins/jobs/drivers/amqp/redial.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package amqp
-
-import (
- "time"
-
- "github.com/cenkalti/backoff/v4"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
-)
-
-// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
- go func() {
- const op = errors.Op("rabbitmq_redial")
-
- for {
- select {
- case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
- if err == nil {
- return
- }
-
- j.Lock()
-
- // trash the broken publishing channel
- <-j.publishChan
-
- t := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeError,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Error: err,
- Start: time.Now(),
- })
-
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = j.retryTimeout
- operation := func() error {
- j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
- var dialErr error
- j.conn, dialErr = amqp.Dial(j.connStr)
- if dialErr != nil {
- return errors.E(op, dialErr)
- }
-
- j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
-
- // re-init connection
- errInit := j.initRabbitMQ()
- if errInit != nil {
- j.log.Error("rabbitmq dial", "error", errInit)
- return errInit
- }
-
- // redeclare consume channel
- var errConnCh error
- j.consumeChan, errConnCh = j.conn.Channel()
- if errConnCh != nil {
- return errors.E(op, errConnCh)
- }
-
- // redeclare publish channel
- pch, errPubCh := j.conn.Channel()
- if errPubCh != nil {
- return errors.E(op, errPubCh)
- }
-
- // start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // put the fresh publishing channel
- j.publishChan <- pch
- // restart listener
- j.listener(deliv)
-
- j.log.Info("queues and subscribers redeclared successfully")
-
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- j.Unlock()
- j.log.Error("backoff failed", "error", retryErr)
- return
- }
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Start: t,
- Elapsed: time.Since(t),
- })
-
- j.Unlock()
-
- case <-j.stopCh:
- if j.publishChan != nil {
- pch := <-j.publishChan
- err := pch.Close()
- if err != nil {
- j.log.Error("publish channel close", "error", err)
- }
- }
-
- if j.consumeChan != nil {
- err := j.consumeChan.Close()
- if err != nil {
- j.log.Error("consume channel close", "error", err)
- }
- }
- if j.conn != nil {
- err := j.conn.Close()
- if err != nil {
- j.log.Error("amqp connection close", "error", err)
- }
- }
-
- return
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
deleted file mode 100644
index a8069f5d..00000000
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package beanstalk
-
-import (
- "time"
-
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- tubePriority string = "tube_priority"
- tube string = "tube"
- reserveTimeout string = "reserve_timeout"
-)
-
-type GlobalCfg struct {
- Addr string `mapstructure:"addr"`
- Timeout time.Duration `mapstructure:"timeout"`
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Addr == "" {
- c.Addr = "tcp://127.0.0.1:11300"
- }
-
- if c.Timeout == 0 {
- c.Timeout = time.Second * 30
- }
-}
-
-type Config struct {
- PipePriority int64 `mapstructure:"priority"`
- TubePriority *uint32 `mapstructure:"tube_priority"`
- Tube string `mapstructure:"tube"`
- ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
-}
-
-func (c *Config) InitDefault() {
- if c.Tube == "" {
- c.Tube = "default"
- }
-
- if c.ReserveTimeout == 0 {
- c.ReserveTimeout = time.Second * 1
- }
-
- if c.TubePriority == nil {
- c.TubePriority = utils.Uint32(0)
- }
-
- if c.PipePriority == 0 {
- c.PipePriority = 10
- }
-}
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
deleted file mode 100644
index d3241b37..00000000
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ /dev/null
@@ -1,223 +0,0 @@
-package beanstalk
-
-import (
- "context"
- "net"
- "sync"
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type ConnPool struct {
- sync.RWMutex
-
- log logger.Logger
-
- conn *beanstalk.Conn
- connT *beanstalk.Conn
- ts *beanstalk.TubeSet
- t *beanstalk.Tube
-
- network string
- address string
- tName string
- tout time.Duration
-}
-
-func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
- connT, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- connTS, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- tube := beanstalk.NewTube(connT, tName)
- ts := beanstalk.NewTubeSet(connTS, tName)
-
- return &ConnPool{
- log: log,
- network: network,
- address: address,
- tName: tName,
- tout: tout,
- conn: connTS,
- connT: connT,
- ts: ts,
- t: tube,
- }, nil
-}
-
-// Put the payload
-// TODO use the context ??
-func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- // TODO(rustatian): redial based on the token
- id, err := cp.t.Put(body, pri, delay, ttr)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry put only when we redialed
- return cp.t.Put(body, pri, delay, ttr)
- }
- }
-
- return id, nil
-}
-
-// Reserve reserves and returns a job from one of the tubes in t. If no
-// job is available before time timeout has passed, Reserve returns a
-// ConnError recording ErrTimeout.
-//
-// Typically, a client will reserve a job, perform some work, then delete
-// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- id, body, err := cp.ts.Reserve(reserveTimeout)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Reserve only when we redialed
- return cp.ts.Reserve(reserveTimeout)
- }
- }
-
- return id, body, nil
-}
-
-func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
- cp.RLock()
- defer cp.RUnlock()
-
- err := cp.conn.Delete(id)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Delete only when we redialed
- return cp.conn.Delete(id)
- }
- }
- return nil
-}
-
-func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- stat, err := cp.conn.Stats()
- if err != nil {
- errR := cp.checkAndRedial(err)
- if errR != nil {
- return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
- } else {
- return cp.conn.Stats()
- }
- }
-
- return stat, nil
-}
-
-func (cp *ConnPool) redial() error {
- const op = errors.Op("connection_pool_redial")
-
- cp.Lock()
- // backoff here
- expb := backoff.NewExponentialBackOff()
- // TODO(rustatian) set via config
- expb.MaxElapsedTime = time.Minute
-
- operation := func() error {
- connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
- if connT == nil {
- return errors.E(op, errors.Str("connectionT is nil"))
- }
-
- connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
-
- if connTS == nil {
- return errors.E(op, errors.Str("connectionTS is nil"))
- }
-
- cp.t = beanstalk.NewTube(connT, cp.tName)
- cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
- cp.conn = connTS
- cp.connT = connT
-
- cp.log.Info("beanstalk redial was successful")
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- cp.Unlock()
- return retryErr
- }
- cp.Unlock()
-
- return nil
-}
-
-var connErrors = map[string]struct{}{"EOF": {}}
-
-func (cp *ConnPool) checkAndRedial(err error) error {
- const op = errors.Op("connection_pool_check_redial")
- switch et := err.(type) { //nolint:gocritic
- // check if the error
- case beanstalk.ConnError:
- switch bErr := et.Err.(type) {
- case *net.OpError:
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
- }
-
- // if redial was successful -> continue listening
- return nil
- default:
- if _, ok := connErrors[et.Err.Error()]; ok {
- // if error is related to the broken connection - redial
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", err, errR))
- }
- // if redial was successful -> continue listening
- return nil
- }
- }
- }
-
- // return initial error
- return err
-}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
deleted file mode 100644
index 6323148b..00000000
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ /dev/null
@@ -1,360 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "context"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type JobConsumer struct {
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
-
- pipeline atomic.Value
- listeners uint32
-
- // beanstalk
- pool *ConnPool
- addr string
- network string
- reserveTimeout time.Duration
- reconnectCh chan struct{}
- tout time.Duration
- // tube name
- tName string
- tubePriority *uint32
- priority int64
-
- stopCh chan struct{}
- requeueCh chan *Item
-}
-
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_beanstalk_consumer")
-
- // PARSE CONFIGURATION -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
- }
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- dsn := strings.Split(globalCfg.Addr, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
- }
-
- cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- pool: cPool,
- network: dsn[0],
- addr: dsn[1],
- tout: globalCfg.Timeout,
- tName: pipeCfg.Tube,
- reserveTimeout: pipeCfg.ReserveTimeout,
- tubePriority: pipeCfg.TubePriority,
- priority: pipeCfg.PipePriority,
-
- // buffered with two because jobs root plugin can call Stop at the same time as Pause
- stopCh: make(chan struct{}, 2),
- requeueCh: make(chan *Item, 1000),
- reconnectCh: make(chan struct{}, 2),
- }
-
- return jc, nil
-}
-
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_beanstalk_consumer")
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
- }
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- dsn := strings.Split(globalCfg.Addr, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
- }
-
- cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- pool: cPool,
- network: dsn[0],
- addr: dsn[1],
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, "default"),
- reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
- priority: pipe.Priority(),
-
- // buffered with two because jobs root plugin can call Stop at the same time as Pause
- stopCh: make(chan struct{}, 2),
- requeueCh: make(chan *Item, 1000),
- reconnectCh: make(chan struct{}, 2),
- }
-
- return jc, nil
-}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
- const op = errors.Op("beanstalk_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != jb.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
- }
-
- err := j.handleItem(ctx, fromJob(jb))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
- const op = errors.Op("beanstalk_handle_item")
-
- bb := new(bytes.Buffer)
- bb.Grow(64)
- err := item.pack(bb)
- if err != nil {
- return errors.E(op, err)
- }
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
- // <pri> is an integer < 2**32. Jobs with smaller priority values will be
- // scheduled before jobs with larger priorities. The most urgent priority is 0;
- // the least urgent priority is 4,294,967,295.
- //
- // <delay> is an integer number of seconds to wait before putting the job in
- // the ready queue. The job will be in the "delayed" state during this time.
- // Maximum delay is 2**32-1.
- //
- // <ttr> -- time to run -- is an integer number of seconds to allow a worker
- // to run this job. This time is counted from the moment a worker reserves
- // this job. If the worker does not delete, release, or bury the job within
- // <ttr> seconds, the job will time out and the server will release the job.
- // The minimum ttr is 1. If the client sends 0, the server will silently
- // increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
- if err != nil {
- errD := j.pool.Delete(ctx, id)
- if errD != nil {
- return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
- }
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- // register the pipeline
- j.pipeline.Store(p)
- return nil
-}
-
-// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("beanstalk_state")
- stat, err := j.pool.Stats(ctx)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- out := &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: j.tName,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }
-
- // set stat, skip errors (replace with 0)
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
- if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
- out.Active = int64(v)
- }
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
- if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
- // this is not an error, reserved in beanstalk behaves like an active jobs
- out.Reserved = int64(v)
- }
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
- if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
- out.Delayed = int64(v)
- }
-
- return out, nil
-}
-
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
- }
-
- atomic.AddUint32(&j.listeners, 1)
-
- go j.listen()
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-
- return nil
-}
-
-func (j *JobConsumer) Stop(context.Context) error {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- if atomic.LoadUint32(&j.listeners) == 1 {
- j.stopCh <- struct{}{}
- }
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-
- return nil
-}
-
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 1 {
- j.log.Warn("sqs listener already in the active state")
- return
- }
-
- // start listener
- go j.listen()
-
- // increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go
deleted file mode 100644
index e43207eb..00000000
--- a/plugins/jobs/drivers/beanstalk/encode_test.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "crypto/rand"
- "encoding/gob"
- "testing"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-func BenchmarkEncodeGob(b *testing.B) {
- tb := make([]byte, 1024*10)
- _, err := rand.Read(tb)
- if err != nil {
- b.Fatal(err)
- }
-
- item := &Item{
- Job: "/super/test/php/class/loooooong",
- Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
- Payload: utils.AsString(tb),
- Headers: map[string][]string{"Test": {"test1", "test2"}},
- Options: &Options{
- Priority: 10,
- Pipeline: "test-local-pipe",
- Delay: 10,
- },
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- bb := new(bytes.Buffer)
- err := gob.NewEncoder(bb).Encode(item)
- if err != nil {
- b.Fatal(err)
- }
- _ = bb.Bytes()
- bb.Reset()
- }
-}
-
-func BenchmarkEncodeJsonIter(b *testing.B) {
- tb := make([]byte, 1024*10)
- _, err := rand.Read(tb)
- if err != nil {
- b.Fatal(err)
- }
-
- item := &Item{
- Job: "/super/test/php/class/loooooong",
- Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
- Payload: utils.AsString(tb),
- Headers: map[string][]string{"Test": {"test1", "test2"}},
- Options: &Options{
- Priority: 10,
- Pipeline: "test-local-pipe",
- Delay: 10,
- },
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- bb, err := json.Marshal(item)
- if err != nil {
- b.Fatal(err)
- }
- _ = bb
- }
-}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
deleted file mode 100644
index f1d7ac76..00000000
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ /dev/null
@@ -1,147 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "context"
- "encoding/gob"
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type Item struct {
- // Job contains pluginName of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // Private ================
- id uint64
- conn *beanstalk.Conn
- requeueFn func(context.Context, *Item) error
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-// Not used in the sqs, MessageAttributes used instead
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- return i.Options.conn.Delete(i.Options.id)
-}
-
-func (i *Item) Nack() error {
- return i.Options.conn.Delete(i.Options.id)
-}
-
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- // delete old job
- err = i.Options.conn.Delete(i.Options.id)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Headers: job.Headers,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
-
-func (i *Item) pack(b *bytes.Buffer) error {
- err := gob.NewEncoder(b).Encode(i)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
- err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
- if err != nil {
- return err
- }
- out.Options.conn = j.pool.conn
- out.Options.id = id
- out.Options.requeueFn = j.handleItem
-
- return nil
-}
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
deleted file mode 100644
index f1385e70..00000000
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package beanstalk
-
-import (
- "github.com/beanstalkd/go-beanstalk"
-)
-
-func (j *JobConsumer) listen() {
- for {
- select {
- case <-j.stopCh:
- j.log.Warn("beanstalk listener stopped")
- return
- default:
- id, body, err := j.pool.Reserve(j.reserveTimeout)
- if err != nil {
- if errB, ok := err.(beanstalk.ConnError); ok {
- switch errB.Err { //nolint:gocritic
- case beanstalk.ErrTimeout:
- j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
- continue
- }
- }
- // in case of other error - continue
- j.log.Error("beanstalk reserve", "error", err)
- continue
- }
-
- item := &Item{}
- err = j.unpack(id, body, item)
- if err != nil {
- j.log.Error("beanstalk unpack item", "error", err)
- continue
- }
-
- // insert job into the priority queue
- j.pq.Insert(item)
- }
- }
-}
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go
deleted file mode 100644
index 529d1474..00000000
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- pluginName string = "beanstalk"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- return make(chan error)
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
-func (p *Plugin) Name() string {
- return pluginName
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
-}
-
-func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, eh, pq)
-}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
deleted file mode 100644
index f0992cd6..00000000
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ /dev/null
@@ -1,274 +0,0 @@
-package ephemeral
-
-import (
- "context"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- prefetch string = "prefetch"
- goroutinesMax uint64 = 1000
-)
-
-type Config struct {
- Prefetch uint64 `mapstructure:"prefetch"`
-}
-
-type JobConsumer struct {
- cfg *Config
- log logger.Logger
- eh events.Handler
- pipeline atomic.Value
- pq priorityqueue.Queue
- localPrefetch chan *Item
-
- // time.sleep goroutines max number
- goroutines uint64
-
- delayed *int64
- active *int64
-
- listeners uint32
- stopCh chan struct{}
-}
-
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_ephemeral_pipeline")
-
- jb := &JobConsumer{
- log: log,
- pq: pq,
- eh: eh,
- goroutines: 0,
- active: utils.Int64(0),
- delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
- }
-
- err := cfg.UnmarshalKey(configKey, &jb.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if jb.cfg.Prefetch == 0 {
- jb.cfg.Prefetch = 100_000
- }
-
- // initialize a local queue
- jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
-
- return jb, nil
-}
-
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- jb := &JobConsumer{
- log: log,
- pq: pq,
- eh: eh,
- goroutines: 0,
- active: utils.Int64(0),
- delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
- }
-
- // initialize a local queue
- jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
-
- return jb, nil
-}
-
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
- const op = errors.Op("ephemeral_push")
-
- // check if the pipeline registered
- _, ok := j.pipeline.Load().(*pipeline.Pipeline)
- if !ok {
- return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
- }
-
- err := j.handleItem(ctx, fromJob(jb))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- return &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: pipe.Name(),
- Active: atomic.LoadInt64(j.active),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }, nil
-}
-
-func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
- return nil
-}
-
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- // stop the consumer
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
-}
-
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // listener already active
- if l == 1 {
- j.log.Warn("listener already in the active state")
- return
- }
-
- // resume the consumer on the same channel
- j.consume()
-
- atomic.StoreUint32(&j.listeners, 1)
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
-}
-
-// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
-func (j *JobConsumer) Stop(ctx context.Context) error {
- const op = errors.Op("ephemeral_plugin_stop")
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- select {
- // return from the consumer
- case j.stopCh <- struct{}{}:
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
-
- return nil
-
- case <-ctx.Done():
- return errors.E(op, ctx.Err())
- }
-}
-
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("ephemeral_handle_request")
- // handle timeouts
- // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
-
- go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- atomic.AddInt64(j.delayed, 1)
-
- time.Sleep(jj.Options.DelayDuration())
-
- // send the item after timeout expired
- j.localPrefetch <- jj
-
- atomic.AddUint64(&j.goroutines, ^uint64(0))
- }(msg)
-
- return nil
- }
-
- // increase number of the active jobs
- atomic.AddInt64(j.active, 1)
-
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
- }
-}
-
-func (j *JobConsumer) consume() {
- go func() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
- return
- }
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
- item.Options.active = j.active
- item.Options.delayed = j.delayed
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
- }
- }
- }()
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
deleted file mode 100644
index 3298424d..00000000
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ /dev/null
@@ -1,133 +0,0 @@
-package ephemeral
-
-import (
- "context"
- "sync/atomic"
- "time"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type Item struct {
- // Job contains name of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // private
- requeueFn func(context.Context, *Item) error
- active *int64
- delayed *int64
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- i.atomicallyReduceCount()
- return nil
-}
-
-func (i *Item) Nack() error {
- i.atomicallyReduceCount()
- return nil
-}
-
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- i.atomicallyReduceCount()
-
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// atomicallyReduceCount reduces counter of active or delayed jobs
-func (i *Item) atomicallyReduceCount() {
- // if job was delayed, reduce number of the delayed jobs
- if i.Options.Delay > 0 {
- atomic.AddInt64(i.Options.delayed, ^int64(0))
- return
- }
-
- // otherwise, reduce number of the active jobs
- atomic.AddInt64(i.Options.active, ^int64(0))
- // noop for the in-memory
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go
deleted file mode 100644
index 28495abb..00000000
--- a/plugins/jobs/drivers/ephemeral/plugin.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "ephemeral"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Available() {}
-
-// JobsConstruct creates new ephemeral consumer from the configuration
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(configKey, p.log, p.cfg, e, pq)
-}
-
-// FromPipeline creates new ephemeral consumer from the provided pipeline
-func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipeline, p.log, e, pq)
-}
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
deleted file mode 100644
index 9b2a1ca8..00000000
--- a/plugins/jobs/drivers/sqs/config.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package sqs
-
-import "github.com/aws/aws-sdk-go-v2/aws"
-
-const (
- attributes string = "attributes"
- tags string = "tags"
- queue string = "queue"
- pref string = "prefetch"
- visibility string = "visibility_timeout"
- waitTime string = "wait_time"
-)
-
-type GlobalCfg struct {
- Key string `mapstructure:"key"`
- Secret string `mapstructure:"secret"`
- Region string `mapstructure:"region"`
- SessionToken string `mapstructure:"session_token"`
- Endpoint string `mapstructure:"endpoint"`
-}
-
-// Config is used to parse pipeline configuration
-type Config struct {
- // The duration (in seconds) that the received messages are hidden from subsequent
- // retrieve requests after being retrieved by a ReceiveMessage request.
- VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
- // The duration (in seconds) for which the call waits for a message to arrive
- // in the queue before returning. If a message is available, the call returns
- // sooner than WaitTimeSeconds. If no messages are available and the wait time
- // expires, the call returns successfully with an empty list of messages.
- WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
- // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
- // than this value (however, fewer messages might be returned). Valid values: 1 to
- // 10. Default: 1.
- Prefetch int32 `mapstructure:"prefetch"`
- // The name of the new queue. The following limits apply to this name:
- //
- // * A queue
- // name can have up to 80 characters.
- //
- // * Valid values: alphanumeric characters,
- // hyphens (-), and underscores (_).
- //
- // * A FIFO queue name must end with the .fifo
- // suffix.
- //
- // Queue URLs and names are case-sensitive.
- //
- // This member is required.
- Queue *string `mapstructure:"queue"`
-
- // A map of attributes with their corresponding values. The following lists the
- // names, descriptions, and values of the special request parameters that the
- // CreateQueue action uses.
- // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
- Attributes map[string]string `mapstructure:"attributes"`
-
- // From amazon docs:
- // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
- // Tagging Your Amazon SQS Queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
- // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
- // guidelines in mind:
- //
- // * Adding more than 50 tags to a queue isn't recommended.
- //
- // *
- // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
- // strings.
- //
- // * Tags are case-sensitive.
- //
- // * A new tag with a key identical to that
- // of an existing tag overwrites the existing tag.
- //
- // For a full list of tag
- // restrictions, see Quotas related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
- // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
- // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
- // permissions don't apply to this action. For more information, see Grant
- // cross-account permissions to a role and a user name
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
- // in the Amazon SQS Developer Guide.
- Tags map[string]string `mapstructure:"tags"`
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Endpoint == "" {
- c.Endpoint = "http://127.0.0.1:9324"
- }
-}
-
-func (c *Config) InitDefault() {
- if c.Queue == nil {
- c.Queue = aws.String("default")
- }
-
- if c.Prefetch == 0 || c.Prefetch > 10 {
- c.Prefetch = 10
- }
-
- if c.WaitTimeSeconds == 0 {
- c.WaitTimeSeconds = 5
- }
-
- if c.Attributes == nil {
- c.Attributes = make(map[string]string)
- }
-
- if c.Tags == nil {
- c.Tags = make(map[string]string)
- }
-}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
deleted file mode 100644
index 17af1caa..00000000
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ /dev/null
@@ -1,411 +0,0 @@
-package sqs
-
-import (
- "context"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/aws/retry"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/google/uuid"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type JobConsumer struct {
- sync.Mutex
- pq priorityqueue.Queue
- log logger.Logger
- eh events.Handler
- pipeline atomic.Value
-
- // connection info
- key string
- secret string
- sessionToken string
- region string
- endpoint string
- queue *string
- messageGroupID string
- waitTime int32
- prefetch int32
- visibilityTimeout int32
-
- // if user invoke several resume operations
- listeners uint32
-
- // queue optional parameters
- attributes map[string]string
- tags map[string]string
-
- client *sqs.Client
- queueURL *string
-
- pauseCh chan struct{}
-}
-
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_sqs_consumer")
-
- // if no such key - error
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
- }
-
- // PARSE CONFIGURATION -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // initialize job consumer
- jb := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- messageGroupID: uuid.NewString(),
- attributes: pipeCfg.Attributes,
- tags: pipeCfg.Tags,
- queue: pipeCfg.Queue,
- prefetch: pipeCfg.Prefetch,
- visibilityTimeout: pipeCfg.VisibilityTimeout,
- waitTime: pipeCfg.WaitTimeSeconds,
- region: globalCfg.Region,
- key: globalCfg.Key,
- sessionToken: globalCfg.SessionToken,
- secret: globalCfg.Secret,
- endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}, 1),
- }
-
- // PARSE CONFIGURATION -------
-
- awsConf, err := config.LoadDefaultConfig(context.Background(),
- config.WithRegion(globalCfg.Region),
- config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // config with retries
- jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
- o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
- opts.MaxAttempts = 60
- })
- })
-
- out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // assign a queue URL
- jb.queueURL = out.QueueUrl
-
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
-
- return jb, nil
-}
-
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- const op = errors.Op("new_sqs_consumer")
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
- }
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- attr := make(map[string]string)
- err = pipe.Map(attributes, attr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- tg := make(map[string]string)
- err = pipe.Map(tags, tg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jb := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- messageGroupID: uuid.NewString(),
- attributes: attr,
- tags: tg,
- queue: aws.String(pipe.String(queue, "default")),
- prefetch: int32(pipe.Int(pref, 10)),
- visibilityTimeout: int32(pipe.Int(visibility, 0)),
- waitTime: int32(pipe.Int(waitTime, 0)),
- region: globalCfg.Region,
- key: globalCfg.Key,
- sessionToken: globalCfg.SessionToken,
- secret: globalCfg.Secret,
- endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}, 1),
- }
-
- // PARSE CONFIGURATION -------
-
- awsConf, err := config.LoadDefaultConfig(context.Background(),
- config.WithRegion(globalCfg.Region),
- config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // config with retries
- jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
- o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
- opts.MaxAttempts = 60
- })
- })
-
- out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // assign a queue URL
- jb.queueURL = out.QueueUrl
-
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
-
- return jb, nil
-}
-
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
- const op = errors.Op("sqs_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != jb.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
- }
-
- // The length of time, in seconds, for which to delay a specific message. Valid
- // values: 0 to 900. Maximum: 15 minutes.
- if jb.Options.Delay > 900 {
- return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
- }
-
- err := j.handleItem(ctx, fromJob(jb))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
- AttributeNames: []types.QueueAttributeName{
- types.QueueAttributeNameApproximateNumberOfMessages,
- types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
- types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
- },
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- out := &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }
-
- nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
- if err == nil {
- out.Active = int64(nom)
- }
-
- delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
- if err == nil {
- out.Delayed = int64(delayed)
- }
-
- nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
- if err == nil {
- out.Reserved = int64(nv)
- }
-
- return out, nil
-}
-
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
- return nil
-}
-
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("sqs_run")
-
- j.Lock()
- defer j.Unlock()
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
- }
-
- atomic.AddUint32(&j.listeners, 1)
-
- // start listener
- go j.listen(context.Background())
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-
- return nil
-}
-
-func (j *JobConsumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
-func (j *JobConsumer) Pause(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- // stop consume
- j.pauseCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *JobConsumer) Resume(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 1 {
- j.log.Warn("sqs listener already in the active state")
- return
- }
-
- // start listener
- go j.listen(context.Background())
-
- // increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
- if err != nil {
- return err
- }
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
deleted file mode 100644
index df72b2e5..00000000
--- a/plugins/jobs/drivers/sqs/item.go
+++ /dev/null
@@ -1,247 +0,0 @@
-package sqs
-
-import (
- "context"
- "strconv"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- StringType string = "String"
- NumberType string = "Number"
- BinaryType string = "Binary"
- ApproximateReceiveCount string = "ApproximateReceiveCount"
-)
-
-var itemAttributes = []string{
- job.RRJob,
- job.RRDelay,
- job.RRPriority,
- job.RRHeaders,
-}
-
-type Item struct {
- // Job contains pluginName of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // Private ================
- approxReceiveCount int64
- queue *string
- receiptHandler *string
- client *sqs.Client
- requeueFn func(context.Context, *Item) error
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-// Not used in the sqs, MessageAttributes used instead
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (i *Item) Nack() error {
- // requeue message
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- // requeue message
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- // Delete job from the queue only after successful requeue
- _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Headers: job.Headers,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
-
-func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
- // pack headers map
- data, err := json.Marshal(i.Headers)
- if err != nil {
- return nil, err
- }
-
- return &sqs.SendMessageInput{
- MessageBody: aws.String(i.Payload),
- QueueUrl: queue,
- DelaySeconds: int32(i.Options.Delay),
- MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
- },
- }, nil
-}
-
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
- const op = errors.Op("sqs_unpack")
- // reserved
- if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
- return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
- }
-
- for i := 0; i < len(itemAttributes); i++ {
- if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok {
- return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i]))
- }
- }
-
- var h map[string][]string
- err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h)
- if err != nil {
- return nil, err
- }
-
- delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- item := &Item{
- Job: *msg.MessageAttributes[job.RRJob].StringValue,
- Payload: *msg.Body,
- Headers: h,
- Options: &Options{
- Delay: int64(delay),
- Priority: int64(priority),
-
- // private
- approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
- receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
- },
- }
-
- return item, nil
-}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
deleted file mode 100644
index 9efef90d..00000000
--- a/plugins/jobs/drivers/sqs/listener.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package sqs
-
-import (
- "context"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws/transport/http"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/aws/smithy-go"
-)
-
-const (
- // All - get all message attribute names
- All string = "All"
-
- // NonExistentQueue AWS error code
- NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
-)
-
-func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
- for {
- select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
- return
- default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
- MessageAttributeNames: []string{All},
- // The new value for the message's visibility timeout (in seconds). Values range: 0
- // to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
- })
-
- if err != nil {
- if oErr, ok := (err).(*smithy.OperationError); ok {
- if rErr, ok := oErr.Err.(*http.ResponseError); ok {
- if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
- // in case of NonExistentQueue - recreate the queue
- if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
- if err != nil {
- j.log.Error("create queue", "error", err)
- }
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to the queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
- continue
- }
- }
- }
- }
-
- j.log.Error("receive message", "error", err)
- continue
- }
-
- for i := 0; i < len(message.Messages); i++ {
- m := message.Messages[i]
- item, err := j.unpack(&m)
- if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
- ReceiptHandle: m.ReceiptHandle,
- })
- if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
- }
-
- j.log.Error("message unpack", "error", err)
- continue
- }
-
- j.pq.Insert(item)
- }
- }
- }
-}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
deleted file mode 100644
index 54f61ff5..00000000
--- a/plugins/jobs/drivers/sqs/plugin.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- pluginName string = "sqs"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) Name() string {
- return pluginName
-}
-
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
-}
-
-func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
-}
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go
index 390f44b5..06c3254e 100644
--- a/plugins/jobs/job/general.go
+++ b/plugins/jobs/job/job.go
@@ -1,5 +1,9 @@
package job
+import (
+ "time"
+)
+
// constant keys to pack/unpack messages from different drivers
const (
RRID string = "rr_id"
@@ -27,3 +31,32 @@ type Job struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
deleted file mode 100644
index b7e4ed36..00000000
--- a/plugins/jobs/job/job_options.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package job
-
-import "time"
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-}
-
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go
index a47151a3..a47151a3 100644
--- a/plugins/jobs/job/job_options_test.go
+++ b/plugins/jobs/job/job_test.go
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5e62c5c5..3f3fa196 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -177,8 +177,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return true
})
+ // do not continue processing, immediately stop if channel contains an error
+ if len(errCh) > 0 {
+ return errCh
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
errCh <- err
return errCh
@@ -219,6 +224,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventJobError,
+ Error: err,
ID: jb.ID(),
Start: start,
Elapsed: time.Since(start),
@@ -243,6 +249,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -266,6 +273,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -279,6 +287,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
+ continue
}
// handle the response protocol
@@ -288,6 +298,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Event: events.EventJobError,
ID: jb.ID(),
Start: start,
+ Error: err,
Elapsed: time.Since(start),
})
p.putPayload(exec)
@@ -307,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
// return payload
p.putPayload(exec)
}
@@ -343,6 +355,10 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
return nil
}