summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
committerValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
commitd099e47ab28dd044d34e18347a4c714b8af3d612 (patch)
treee106e13bba48e435b87d218237b282d7f691b52c /plugins
parentec7c049036d31fe030d106db9f0d268ea0296c5f (diff)
SQS driver.
Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/sqs/consumer.go1
-rw-r--r--plugins/jobs/brokers/sqs/item.go1
-rw-r--r--plugins/jobs/brokers/sqs/plugin.go1
-rw-r--r--plugins/jobs/drivers/amqp/config.go58
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go (renamed from plugins/jobs/brokers/amqp/consumer.go)69
-rw-r--r--plugins/jobs/drivers/amqp/item.go (renamed from plugins/jobs/brokers/amqp/item.go)115
-rw-r--r--plugins/jobs/drivers/amqp/listener.go (renamed from plugins/jobs/brokers/amqp/listener.go)2
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go (renamed from plugins/jobs/brokers/amqp/plugin.go)0
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go (renamed from plugins/jobs/brokers/amqp/rabbit_init.go)2
-rw-r--r--plugins/jobs/drivers/amqp/redial.go (renamed from plugins/jobs/brokers/amqp/redial.go)1
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go (renamed from plugins/jobs/brokers/ephemeral/consumer.go)21
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go (renamed from plugins/jobs/brokers/ephemeral/item.go)20
-rw-r--r--plugins/jobs/drivers/ephemeral/plugin.go (renamed from plugins/jobs/brokers/ephemeral/plugin.go)0
-rw-r--r--plugins/jobs/drivers/sqs/config.go103
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go190
-rw-r--r--plugins/jobs/drivers/sqs/item.go192
-rw-r--r--plugins/jobs/drivers/sqs/listener.go66
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
-rw-r--r--plugins/jobs/job/general.go (renamed from plugins/jobs/structs/general.go)18
-rw-r--r--plugins/jobs/job/job_options.go (renamed from plugins/jobs/structs/job_options.go)14
-rw-r--r--plugins/jobs/job/job_options_test.go (renamed from plugins/jobs/structs/job_options_test.go)2
-rw-r--r--plugins/jobs/pipeline/pipeline.go15
-rw-r--r--plugins/jobs/plugin.go27
-rw-r--r--plugins/jobs/rpc.go10
24 files changed, 795 insertions, 172 deletions
diff --git a/plugins/jobs/brokers/sqs/consumer.go b/plugins/jobs/brokers/sqs/consumer.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/consumer.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/brokers/sqs/item.go b/plugins/jobs/brokers/sqs/item.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/item.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/brokers/sqs/plugin.go b/plugins/jobs/brokers/sqs/plugin.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
new file mode 100644
index 00000000..7befb3c8
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -0,0 +1,58 @@
+package amqp
+
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ PrefetchCount int `mapstructure:"pipeline_size"`
+ Queue string `mapstructure:"queue"`
+ Priority int64 `mapstructure:"priority"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
+ Exclusive bool `mapstructure:"exclusive"`
+}
+
+func (c *Config) InitDefault() {
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "default"
+ }
+
+ if c.PrefetchCount == 0 {
+ c.PrefetchCount = 100
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:guest@localhost:5672/"
+ }
+}
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index c2807b54..31999e23 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -11,43 +11,14 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/streadway/amqp"
)
-// pipeline rabbitmq info
-const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange-type"
- queue string = "queue"
- routingKey string = "routing-key"
- prefetch string = "prefetch"
-
- dlx string = "x-dead-letter-exchange"
- dlxRoutingKey string = "x-dead-letter-routing-key"
- dlxTTL string = "x-message-ttl"
- dlxExpires string = "x-expires"
-
- contentType string = "application/octet-stream"
-)
-
-type GlobalCfg struct {
- Addr string `mapstructure:"addr"`
-}
-
-// Config is used to parse pipeline configuration
-type Config struct {
- PrefetchCount int `mapstructure:"pipeline_size"`
- Queue string `mapstructure:"queue"`
- Exchange string `mapstructure:"exchange"`
- ExchangeType string `mapstructure:"exchange_type"`
- RoutingKey string `mapstructure:"routing_key"`
-}
-
type JobsConsumer struct {
- sync.RWMutex
+ sync.Mutex
log logger.Logger
pq priorityqueue.Queue
eh events.Handler
@@ -61,8 +32,10 @@ type JobsConsumer struct {
retryTimeout time.Duration
prefetchCount int
+ priority int64
exchangeName string
queue string
+ exclusive bool
consumeID string
connStr string
exchangeType string
@@ -123,6 +96,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
jb.exchangeType = pipeCfg.ExchangeType
jb.exchangeName = pipeCfg.Exchange
jb.prefetchCount = pipeCfg.PrefetchCount
+ jb.exclusive = pipeCfg.Exclusive
+ jb.priority = pipeCfg.Priority
// PARSE CONFIGURATION -------
@@ -185,6 +160,8 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
jb.exchangeType = pipeline.String(exchangeType, "direct")
jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
jb.prefetchCount = pipeline.Int(prefetch, 10)
+ jb.priority = int64(pipeline.Int(priority, 10))
+ jb.exclusive = pipeline.Bool(exclusive, true)
// PARSE CONFIGURATION -------
@@ -206,13 +183,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return nil, errors.E(op, err)
}
+ // register the pipeline
+ // error here is always nil
+ _ = jb.Register(pipeline)
+
// run redialer for the connection
jb.redialer()
return jb, nil
}
-func (j *JobsConsumer) Push(job *structs.Job) error {
+func (j *JobsConsumer) Push(job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -235,9 +216,9 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
}
// handle timeouts
- if job.Options.DelayDuration() > 0 {
+ if msg.Options.DelayDuration() > 0 {
// TODO declare separate method for this if condition
- delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
// delay cache optimization.
@@ -433,23 +414,3 @@ func (j *JobsConsumer) Stop() error {
})
return nil
}
-
-func (c *Config) InitDefault() {
- if c.ExchangeType == "" {
- c.ExchangeType = "direct"
- }
-
- if c.Exchange == "" {
- c.Exchange = "default"
- }
-
- if c.PrefetchCount == 0 {
- c.PrefetchCount = 100
- }
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Addr == "" {
- c.Addr = "amqp://guest:guest@localhost:5672/"
- }
-}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index a46f1ca2..7c300c88 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -5,33 +5,23 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
-const (
- rrID string = "rr_id"
- rrJob string = "rr_job"
- rrHeaders string = "rr_headers"
- rrPipeline string = "rr_pipeline"
- rrTimeout string = "rr_timeout"
- rrDelay string = "rr_delay"
- rrRetryDelay string = "rr_retry_delay"
-)
-
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
@@ -50,34 +40,16 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint32 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int32 `json:"delay,omitempty"`
-
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts int32 `json:"maxAttempts,omitempty"`
-
- // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int32 `json:"retryDelay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int32 `json:"timeout,omitempty"`
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int32) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
-// RetryDuration returns retry delay duration in a form of time.Duration.
-func (o *Options) RetryDuration() time.Duration {
- return time.Second * time.Duration(o.RetryDelay)
+ Timeout int64 `json:"timeout,omitempty"`
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -98,8 +70,8 @@ func (j *Item) ID() string {
return j.Ident
}
-func (j *Item) Priority() uint64 {
- return uint64(j.Options.Priority)
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
}
// Body packs job payload into binary payload.
@@ -121,9 +93,9 @@ func (j *Item) Nack() error {
return j.NackFunc(false, false)
}
-func fromDelivery(d amqp.Delivery) (*Item, error) {
+func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- item, err := unpack(d)
+ item, err := j.unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
@@ -138,18 +110,16 @@ func fromDelivery(d amqp.Delivery) (*Item, error) {
}, nil
}
-func fromJob(job *structs.Job) *Item {
+func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
Options: &Options{
- Priority: uint32(job.Options.Priority),
- Pipeline: job.Options.Pipeline,
- Delay: int32(job.Options.Delay),
- Attempts: int32(job.Options.Attempts),
- RetryDelay: int32(job.Options.RetryDelay),
- Timeout: int32(job.Options.Timeout),
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
},
}
}
@@ -161,54 +131,57 @@ func pack(id string, j *Item) (amqp.Table, error) {
return nil, err
}
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- rrPipeline: j.Options.Pipeline,
- rrHeaders: headers,
- rrTimeout: j.Options.Timeout,
- rrDelay: j.Options.Delay,
- rrRetryDelay: j.Options.RetryDelay,
+ job.RRID: id,
+ job.RRJob: j.Job,
+ job.RRPipeline: j.Options.Pipeline,
+ job.RRHeaders: headers,
+ job.RRTimeout: j.Options.Timeout,
+ job.RRDelay: j.Options.Delay,
+ job.RRPriority: j.Options.Priority,
}, nil
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (*Item, error) {
- j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
- if _, ok := d.Headers[rrID].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", rrID))
+ if _, ok := d.Headers[job.RRID].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
}
- j.Ident = d.Headers[rrID].(string)
+ item.Ident = d.Headers[job.RRID].(string)
- if _, ok := d.Headers[rrJob].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", rrJob))
+ if _, ok := d.Headers[job.RRJob].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
}
- j.Job = d.Headers[rrJob].(string)
+ item.Job = d.Headers[job.RRJob].(string)
- if _, ok := d.Headers[rrPipeline].(string); ok {
- j.Options.Pipeline = d.Headers[rrPipeline].(string)
+ if _, ok := d.Headers[job.RRPipeline].(string); ok {
+ item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
}
- if h, ok := d.Headers[rrHeaders].([]byte); ok {
- err := json.Unmarshal(h, &j.Headers)
+ if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &item.Headers)
if err != nil {
return nil, err
}
}
- if _, ok := d.Headers[rrTimeout].(int32); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(int32)
+ if _, ok := d.Headers[job.RRTimeout].(int64); ok {
+ item.Options.Timeout = d.Headers[job.RRTimeout].(int64)
}
- if _, ok := d.Headers[rrDelay].(int32); ok {
- j.Options.Delay = d.Headers[rrDelay].(int32)
+ if _, ok := d.Headers[job.RRDelay].(int64); ok {
+ item.Options.Delay = d.Headers[job.RRDelay].(int64)
}
- if _, ok := d.Headers[rrRetryDelay].(int32); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
+ if _, ok := d.Headers[job.RRPriority]; !ok {
+ // set pipe's priority
+ item.Options.Priority = j.priority
+ } else {
+ item.Options.Priority = d.Headers[job.RRPriority].(int64)
}
- return j, nil
+ return item, nil
}
diff --git a/plugins/jobs/brokers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
index 2b994fc5..7241c717 100644
--- a/plugins/jobs/brokers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -12,7 +12,7 @@ func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
return
}
- d, err := fromDelivery(msg)
+ d, err := j.fromDelivery(msg)
if err != nil {
j.log.Error("amqp delivery convert", "error", err)
continue
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
index 624f4405..624f4405 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/drivers/amqp/plugin.go
diff --git a/plugins/jobs/brokers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
index e3e5f8da..d6b8a708 100644
--- a/plugins/jobs/brokers/amqp/rabbit_init.go
+++ b/plugins/jobs/drivers/amqp/rabbit_init.go
@@ -36,7 +36,7 @@ func (j *JobsConsumer) initRabbitMQ() error {
j.queue,
false,
false,
- true,
+ j.exclusive,
false,
nil,
)
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index 6ce69ed9..0b52a4d1 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -32,7 +32,6 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
Driver: pipe.Driver(),
Error: err,
Start: time.Now(),
- Elapsed: 0,
})
j.log.Error("connection closed, reconnecting", "error", err)
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 559cb2e9..45ee8083 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -8,8 +8,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -79,34 +79,35 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobBroker) Push(job *structs.Job) error {
+func (j *JobBroker) Push(jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.pipeline.Load(job.Options.Pipeline); ok {
+ if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
}
+ msg := fromJob(jb)
// handle timeouts
- if job.Options.Timeout > 0 {
- go func(jj *structs.Job) {
+ if msg.Options.Timeout > 0 {
+ go func(jj *job.Job) {
time.Sleep(jj.Options.TimeoutDuration())
// send the item after timeout expired
- j.localQueue <- From(job)
- }(job)
+ j.localQueue <- msg
+ }(jb)
return nil
}
// insert to the local, limited pipeline
- j.localQueue <- From(job)
+ j.localQueue <- msg
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
func (j *JobBroker) consume() {
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 211ac56a..442533c5 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -4,11 +4,11 @@ import (
"time"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
-func From(job *structs.Job) *Item {
+func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
Ident: job.Ident,
@@ -27,13 +27,13 @@ type Item struct {
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
@@ -43,16 +43,16 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -73,7 +73,7 @@ func (j *Item) ID() string {
return j.Ident
}
-func (j *Item) Priority() uint64 {
+func (j *Item) Priority() int64 {
return j.Options.Priority
}
@@ -89,7 +89,7 @@ func (j *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout uint64 `json:"timeout"`
+ Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
}{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
)
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go
index 28495abb..28495abb 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/drivers/ephemeral/plugin.go
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
new file mode 100644
index 00000000..0b4e8157
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -0,0 +1,103 @@
+package sqs
+
+type GlobalCfg struct {
+ Key string `mapstructure:"key"`
+ Secret string `mapstructure:"secret"`
+ Region string `mapstructure:"region"`
+ SessionToken string `mapstructure:"session_token"`
+ Endpoint string `mapstructure:"endpoint"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ // The duration (in seconds) that the received messages are hidden from subsequent
+ // retrieve requests after being retrieved by a ReceiveMessage request.
+ VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
+ // The duration (in seconds) for which the call waits for a message to arrive
+ // in the queue before returning. If a message is available, the call returns
+ // sooner than WaitTimeSeconds. If no messages are available and the wait time
+ // expires, the call returns successfully with an empty list of messages.
+ WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
+ // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages
+ // than this value (however, fewer messages might be returned). Valid values: 1 to
+ // 10. Default: 1.
+ PrefetchCount int32 `mapstructure:"pipeline_size"`
+ // The name of the new queue. The following limits apply to this name:
+ //
+ // * A queue
+ // name can have up to 80 characters.
+ //
+ // * Valid values: alphanumeric characters,
+ // hyphens (-), and underscores (_).
+ //
+ // * A FIFO queue name must end with the .fifo
+ // suffix.
+ //
+ // Queue URLs and names are case-sensitive.
+ //
+ // This member is required.
+ Queue string `mapstructure:"queue"`
+
+ // A map of attributes with their corresponding values. The following lists the
+ // names, descriptions, and values of the special request parameters that the
+ // CreateQueue action uses.
+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+ Attributes map[string]string `mapstructure:"attributes"`
+
+ // From amazon docs:
+ // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
+ // Tagging Your Amazon SQS Queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
+ // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
+ // guidelines in mind:
+ //
+ // * Adding more than 50 tags to a queue isn't recommended.
+ //
+ // *
+ // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
+ // strings.
+ //
+ // * Tags are case-sensitive.
+ //
+ // * A new tag with a key identical to that
+ // of an existing tag overwrites the existing tag.
+ //
+ // For a full list of tag
+ // restrictions, see Quotas related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
+ // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
+ // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
+ // permissions don't apply to this action. For more information, see Grant
+ // cross-account permissions to a role and a user name
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
+ // in the Amazon SQS Developer Guide.
+ Tags map[string]string `mapstructure:"tags"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Endpoint == "" {
+ c.Endpoint = "http://localhost:9324"
+ }
+}
+
+func (c *Config) InitDefault() {
+ if c.Queue == "" {
+ c.Queue = "default"
+ }
+
+ if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
+ c.PrefetchCount = 10
+ }
+
+ if c.WaitTimeSeconds == 0 {
+ c.WaitTimeSeconds = 5
+ }
+
+ if c.Attributes == nil {
+ c.Attributes = make(map[string]string)
+ }
+
+ if c.Tags == nil {
+ c.Tags = make(map[string]string)
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
new file mode 100644
index 00000000..c0f66589
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -0,0 +1,190 @@
+package sqs
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ pq priorityqueue.Queue
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+
+ // connection info
+ key string
+ secret string
+ sessionToken string
+ region string
+ endpoint string
+ queue string
+ messageGroupID string
+ waitTime int32
+ prefetch int32
+ visibilityTimeout int32
+
+ // queue optional parameters
+ attributes map[string]string
+ tags map[string]string
+
+ client *sqs.Client
+ outputQ *sqs.CreateQueueOutput
+
+ pauseCh chan struct{}
+}
+
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: pipeCfg.Attributes,
+ tags: pipeCfg.Tags,
+ queue: pipeCfg.Queue,
+ prefetch: pipeCfg.PrefetchCount,
+ visibilityTimeout: pipeCfg.VisibilityTimeout,
+ waitTime: pipeCfg.WaitTimeSeconds,
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return jb, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ return &JobConsumer{}, nil
+}
+
+func (j *JobConsumer) Push(jb *job.Job) error {
+ const op = errors.Op("sqs_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // The length of time, in seconds, for which to delay a specific message. Valid
+ // values: 0 to 900. Maximum: 15 minutes.
+ if jb.Options.Delay > 900 {
+ return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
+ }
+
+ msg := fromJob(jb)
+
+ // The new value for the message's visibility timeout (in seconds). Values range: 0
+ // to 43200. Maximum: 12 hours.
+ _, err := j.client.SendMessage(context.Background(), j.pack(msg))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ j.pipeline.Store(pipeline)
+ return nil
+}
+
+func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ j.Lock()
+ defer j.Unlock()
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // start listener
+ go j.listen()
+
+ return nil
+}
+
+func (j *JobConsumer) Stop() error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
new file mode 100644
index 00000000..ef736be9
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -0,0 +1,192 @@
+package sqs
+
+import (
+ "strconv"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ StringType string = "String"
+ NumberType string = "Number"
+ ApproximateReceiveCount string = "ApproximateReceiveCount"
+)
+
+var attributes = []string{
+ job.RRJob,
+ job.RRDelay,
+ job.RRTimeout,
+ job.RRPriority,
+ job.RRMaxAttempts,
+}
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+
+ // Maximum number of attempts to receive and process the message
+ MaxAttempts int64 `json:"max_attempts,omitempty"`
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.MaxAttempts > (attempt + 1)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (j *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (j *Item) Ack() error {
+ return nil
+}
+
+func (j *Item) Nack() error {
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ MaxAttempts: job.Options.Attempts,
+ },
+ }
+}
+
+func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ MessageBody: aws.String(item.Payload),
+ QueueUrl: j.outputQ.QueueUrl,
+ DelaySeconds: int32(item.Options.Delay),
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))},
+ },
+ }
+}
+
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
+ const op = errors.Op("sqs_unpack")
+ // reserved
+ if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
+ return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ }
+
+ for i := 0; i < len(attributes); i++ {
+ if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
+ return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ }
+ }
+
+ attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ item := &Item{
+ Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Payload: *msg.Body,
+ Options: &Options{
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
+ MaxAttempts: int64(attempt),
+ },
+ }
+
+ return item, recCount, nil
+}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
new file mode 100644
index 00000000..a10ce5a6
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -0,0 +1,66 @@
+package sqs
+
+import (
+ "context"
+
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+)
+
+const (
+ All string = "All"
+)
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.pauseCh:
+ return
+ default:
+ message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ MaxNumberOfMessages: j.prefetch,
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
+ MessageAttributeNames: []string{All},
+ VisibilityTimeout: j.visibilityTimeout,
+ WaitTimeSeconds: j.waitTime,
+ })
+ if err != nil {
+ j.log.Error("receive message", "error", err)
+ continue
+ }
+
+ for i := 0; i < len(message.Messages); i++ {
+ m := message.Messages[i]
+ item, attempt, err := j.unpack(&m)
+ if err != nil {
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+
+ j.log.Error("message unpack", "error", err)
+ continue
+ }
+
+ if item.Options.CanRetry(int64(attempt)) {
+ j.pq.Insert(item)
+ continue
+ }
+
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
new file mode 100644
index 00000000..54f61ff5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/plugin.go
@@ -0,0 +1,39 @@
+package sqs
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "sqs"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/structs/general.go b/plugins/jobs/job/general.go
index ae754286..2c7d04f0 100644
--- a/plugins/jobs/structs/general.go
+++ b/plugins/jobs/job/general.go
@@ -1,4 +1,16 @@
-package structs
+package job
+
+// constant keys to pack/unpack messages from different drivers
+const (
+ RRID string = "rr_id"
+ RRJob string = "rr_job"
+ RRHeaders string = "rr_headers"
+ RRPipeline string = "rr_pipeline"
+ RRTimeout string = "rr_timeout"
+ RRDelay string = "rr_delay"
+ RRPriority string = "rr_priority"
+ RRMaxAttempts string = "rr_max_attempts"
+)
// Job carries information about single job.
type Job struct {
@@ -6,13 +18,13 @@ type Job struct {
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-value pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/job/job_options.go
index d48e2c56..af971d15 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/job/job_options.go
@@ -1,4 +1,4 @@
-package structs
+package job
import "time"
@@ -6,23 +6,23 @@ import "time"
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
+ Attempts int64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
+ RetryDelay int64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ Timeout int64 `json:"timeout,omitempty"`
}
// Merge merges job options.
@@ -49,7 +49,7 @@ func (o *Options) Merge(from *Options) {
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
+func (o *Options) CanRetry(attempt int64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/job/job_options_test.go
index a16f7dd0..500d8006 100644
--- a/plugins/jobs/structs/job_options_test.go
+++ b/plugins/jobs/job/job_options_test.go
@@ -1,4 +1,4 @@
-package structs
+package job
import (
"testing"
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index 91898178..90eeb189 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -55,10 +55,21 @@ func (p Pipeline) Int(name string, d int) int {
return d
}
+// Bool must return option value as bool or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(bool); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
// Priority returns default pipeline priority
-func (p Pipeline) Priority() uint64 {
+func (p Pipeline) Priority() int64 {
if value, ok := p[priority]; ok {
- if v, ok := value.(uint64); ok {
+ if v, ok := value.(int64); ok {
return v
}
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index c83078c3..ce51df21 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,9 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -14,8 +16,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -172,6 +174,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return errCh
}
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
+ fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
+ fmt.Printf("---> curr len: %d\n", p.queue.Len())
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// start listening
go func() {
for i := uint8(0); i < p.cfg.NumPollers; i++ {
@@ -219,6 +238,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
}
}
}()
@@ -289,7 +310,7 @@ func (p *Plugin) Reset() error {
return nil
}
-func (p *Plugin) Push(j *structs.Job) error {
+func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
// get the pipeline for the job
@@ -320,7 +341,7 @@ func (p *Plugin) Push(j *structs.Job) error {
return nil
}
-func (p *Plugin) PushBatch(j []*structs.Job) error {
+func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 0bb94fa4..a2bd9c6d 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -2,7 +2,7 @@ package jobs
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/logger"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
@@ -49,7 +49,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
l := len(j.GetJobs())
- batch := make([]*structs.Job, l)
+ batch := make([]*job.Job, l)
for i := 0; i < l; i++ {
// convert transport entity into domain
@@ -93,19 +93,19 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *structs.Job {
+func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()
}
- jb := &structs.Job{
+ jb := &job.Job{
Job: j.GetJob(),
Headers: headers,
Ident: j.GetId(),
Payload: j.GetPayload(),
- Options: &structs.Options{
+ Options: &job.Options{
Priority: j.GetOptions().GetPriority(),
Pipeline: j.GetOptions().GetPipeline(),
Delay: j.GetOptions().GetDelay(),