summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/jobs/interface.go4
-rw-r--r--go.mod4
-rw-r--r--go.sum22
-rw-r--r--pkg/priority_queue/binary_heap.go1
-rw-r--r--pkg/priority_queue/binary_heap_test.go4
-rw-r--r--pkg/priority_queue/interface.go2
-rw-r--r--plugins/jobs/brokers/sqs/consumer.go1
-rw-r--r--plugins/jobs/brokers/sqs/item.go1
-rw-r--r--plugins/jobs/brokers/sqs/plugin.go1
-rw-r--r--plugins/jobs/drivers/amqp/config.go58
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go (renamed from plugins/jobs/brokers/amqp/consumer.go)69
-rw-r--r--plugins/jobs/drivers/amqp/item.go (renamed from plugins/jobs/brokers/amqp/item.go)115
-rw-r--r--plugins/jobs/drivers/amqp/listener.go (renamed from plugins/jobs/brokers/amqp/listener.go)2
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go (renamed from plugins/jobs/brokers/amqp/plugin.go)0
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go (renamed from plugins/jobs/brokers/amqp/rabbit_init.go)2
-rw-r--r--plugins/jobs/drivers/amqp/redial.go (renamed from plugins/jobs/brokers/amqp/redial.go)1
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go (renamed from plugins/jobs/brokers/ephemeral/consumer.go)21
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go (renamed from plugins/jobs/brokers/ephemeral/item.go)20
-rw-r--r--plugins/jobs/drivers/ephemeral/plugin.go (renamed from plugins/jobs/brokers/ephemeral/plugin.go)0
-rw-r--r--plugins/jobs/drivers/sqs/config.go103
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go190
-rw-r--r--plugins/jobs/drivers/sqs/item.go192
-rw-r--r--plugins/jobs/drivers/sqs/listener.go66
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
-rw-r--r--plugins/jobs/job/general.go (renamed from plugins/jobs/structs/general.go)18
-rw-r--r--plugins/jobs/job/job_options.go (renamed from plugins/jobs/structs/job_options.go)14
-rw-r--r--plugins/jobs/job/job_options_test.go (renamed from plugins/jobs/structs/job_options_test.go)2
-rw-r--r--plugins/jobs/pipeline/pipeline.go15
-rw-r--r--plugins/jobs/plugin.go27
-rw-r--r--plugins/jobs/rpc.go10
-rw-r--r--proto/jobs/v1beta/jobs.pb.go30
-rw-r--r--proto/jobs/v1beta/jobs.proto10
-rw-r--r--tests/env/docker-compose-jobs.yml22
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml11
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml105
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go78
36 files changed, 1033 insertions, 227 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index e94e97b1..f90c9c21 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -3,13 +3,13 @@ package jobs
import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
// Consumer todo naming
type Consumer interface {
- Push(job *structs.Job) error
+ Push(job *job.Job) error
Register(pipeline *pipeline.Pipeline) error
Run(pipeline *pipeline.Pipeline) error
Stop() error
diff --git a/go.mod b/go.mod
index 7a12c9b9..1cedb379 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,10 @@ go 1.16
require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/alicebob/miniredis/v2 v2.14.5
+ github.com/aws/aws-sdk-go-v2 v1.7.0
+ github.com/aws/aws-sdk-go-v2/config v1.4.1
+ github.com/aws/aws-sdk-go-v2/credentials v1.3.0
+ github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
diff --git a/go.sum b/go.sum
index c64a1a33..fbaf6411 100644
--- a/go.sum
+++ b/go.sum
@@ -43,6 +43,26 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
+github.com/aws/aws-sdk-go-v2 v1.7.0 h1:UYGnoIPIzed+ycmgw8Snb/0HK+KlMD+SndLTneG8ncE=
+github.com/aws/aws-sdk-go-v2 v1.7.0/go.mod h1:tb9wi5s61kTDA5qCkcDbt3KRVV74GGslQkl/DRdX/P4=
+github.com/aws/aws-sdk-go-v2/config v1.4.1 h1:PcGp9Kf+1dHJmP3EIDZJmAmWfGABFTU0obuvYQNzWH8=
+github.com/aws/aws-sdk-go-v2/config v1.4.1/go.mod h1:HCDWZ/oeY59TPtXslxlbkCqLQBsVu6b09kiG43tdP+I=
+github.com/aws/aws-sdk-go-v2/credentials v1.3.0 h1:vXxTINCsHn6LKhR043jwSLd6CsL7KOEU7b1woMr1K1A=
+github.com/aws/aws-sdk-go-v2/credentials v1.3.0/go.mod h1:tOcv+qDZ0O+6Jk2beMl5JnZX6N0H7O8fw9UsD3bP7GI=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0 h1:ucExzYCoAiL9GpKOsKkQLsa43wTT23tcdP4cDTSbZqY=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0/go.mod h1:XvzoGzuS0kKPzCQtJCC22Xh/mMgVAzfGo/0V+mk/Cu0=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0 h1:DJq/vXXF+LAFaa/kQX9C6arlf4xX4uaaqGWIyAKOCpM=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0/go.mod h1:qGQ/9IfkZonRNSNLE99/yBJ7EPA/h8jlWEqtJCcaj+Q=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0 h1:g2npzssI/6XsoQaPYCxliMFeC5iNKKvO0aC+/wWOE0A=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0/go.mod h1:a7XLWNKuVgOxjssEF019IiHPv35k8KHBaWv/wJAfi2A=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0 h1:45YlPhQ/U5v8QnzJFD1bWlTT4IA2NQ9tQ2D/AfyIX3Q=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0/go.mod h1:8iLn005F6ASRIXmp6U4hfRAk8EHAtRPrx1oHyxxz2xg=
+github.com/aws/aws-sdk-go-v2/service/sso v1.3.0 h1:DMi9w+TpUam7eJ8ksL7svfzpqpqem2MkDAJKW8+I2/k=
+github.com/aws/aws-sdk-go-v2/service/sso v1.3.0/go.mod h1:qWR+TUuvfji9udM79e4CPe87C5+SjMEb2TFXkZaI0Vc=
+github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43iziThTvqs5FqOg=
+github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg=
+github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM=
+github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -208,6 +228,8 @@ github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmK
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index 514ca460..fc043927 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -56,7 +56,6 @@ func (bh *BinHeap) fixDown(curr, end int) {
}
idxToSwap := cOneIdx
- // oh my, so unsafe
if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
idxToSwap = cTwoIdx
}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index f30cf8d8..663741ad 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -32,8 +32,8 @@ func (t Test) ID() string {
return "none"
}
-func (t Test) Priority() uint64 {
- return uint64(t)
+func (t Test) Priority() int64 {
+ return int64(t)
}
func TestBinHeap_Init(t *testing.T) {
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 1efebf1c..d64aaf3d 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -12,7 +12,7 @@ type Item interface {
ID() string
// Priority returns the Item's priority to sort
- Priority() uint64
+ Priority() int64
// Body is the Item payload
Body() []byte
diff --git a/plugins/jobs/brokers/sqs/consumer.go b/plugins/jobs/brokers/sqs/consumer.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/consumer.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/brokers/sqs/item.go b/plugins/jobs/brokers/sqs/item.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/item.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/brokers/sqs/plugin.go b/plugins/jobs/brokers/sqs/plugin.go
deleted file mode 100644
index e63cf0e5..00000000
--- a/plugins/jobs/brokers/sqs/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package sqs
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
new file mode 100644
index 00000000..7befb3c8
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -0,0 +1,58 @@
+package amqp
+
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ PrefetchCount int `mapstructure:"pipeline_size"`
+ Queue string `mapstructure:"queue"`
+ Priority int64 `mapstructure:"priority"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
+ Exclusive bool `mapstructure:"exclusive"`
+}
+
+func (c *Config) InitDefault() {
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "default"
+ }
+
+ if c.PrefetchCount == 0 {
+ c.PrefetchCount = 100
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:guest@localhost:5672/"
+ }
+}
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index c2807b54..31999e23 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -11,43 +11,14 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/streadway/amqp"
)
-// pipeline rabbitmq info
-const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange-type"
- queue string = "queue"
- routingKey string = "routing-key"
- prefetch string = "prefetch"
-
- dlx string = "x-dead-letter-exchange"
- dlxRoutingKey string = "x-dead-letter-routing-key"
- dlxTTL string = "x-message-ttl"
- dlxExpires string = "x-expires"
-
- contentType string = "application/octet-stream"
-)
-
-type GlobalCfg struct {
- Addr string `mapstructure:"addr"`
-}
-
-// Config is used to parse pipeline configuration
-type Config struct {
- PrefetchCount int `mapstructure:"pipeline_size"`
- Queue string `mapstructure:"queue"`
- Exchange string `mapstructure:"exchange"`
- ExchangeType string `mapstructure:"exchange_type"`
- RoutingKey string `mapstructure:"routing_key"`
-}
-
type JobsConsumer struct {
- sync.RWMutex
+ sync.Mutex
log logger.Logger
pq priorityqueue.Queue
eh events.Handler
@@ -61,8 +32,10 @@ type JobsConsumer struct {
retryTimeout time.Duration
prefetchCount int
+ priority int64
exchangeName string
queue string
+ exclusive bool
consumeID string
connStr string
exchangeType string
@@ -123,6 +96,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
jb.exchangeType = pipeCfg.ExchangeType
jb.exchangeName = pipeCfg.Exchange
jb.prefetchCount = pipeCfg.PrefetchCount
+ jb.exclusive = pipeCfg.Exclusive
+ jb.priority = pipeCfg.Priority
// PARSE CONFIGURATION -------
@@ -185,6 +160,8 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
jb.exchangeType = pipeline.String(exchangeType, "direct")
jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
jb.prefetchCount = pipeline.Int(prefetch, 10)
+ jb.priority = int64(pipeline.Int(priority, 10))
+ jb.exclusive = pipeline.Bool(exclusive, true)
// PARSE CONFIGURATION -------
@@ -206,13 +183,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return nil, errors.E(op, err)
}
+ // register the pipeline
+ // error here is always nil
+ _ = jb.Register(pipeline)
+
// run redialer for the connection
jb.redialer()
return jb, nil
}
-func (j *JobsConsumer) Push(job *structs.Job) error {
+func (j *JobsConsumer) Push(job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -235,9 +216,9 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
}
// handle timeouts
- if job.Options.DelayDuration() > 0 {
+ if msg.Options.DelayDuration() > 0 {
// TODO declare separate method for this if condition
- delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
// delay cache optimization.
@@ -433,23 +414,3 @@ func (j *JobsConsumer) Stop() error {
})
return nil
}
-
-func (c *Config) InitDefault() {
- if c.ExchangeType == "" {
- c.ExchangeType = "direct"
- }
-
- if c.Exchange == "" {
- c.Exchange = "default"
- }
-
- if c.PrefetchCount == 0 {
- c.PrefetchCount = 100
- }
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Addr == "" {
- c.Addr = "amqp://guest:guest@localhost:5672/"
- }
-}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index a46f1ca2..7c300c88 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -5,33 +5,23 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
-const (
- rrID string = "rr_id"
- rrJob string = "rr_job"
- rrHeaders string = "rr_headers"
- rrPipeline string = "rr_pipeline"
- rrTimeout string = "rr_timeout"
- rrDelay string = "rr_delay"
- rrRetryDelay string = "rr_retry_delay"
-)
-
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
@@ -50,34 +40,16 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint32 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int32 `json:"delay,omitempty"`
-
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts int32 `json:"maxAttempts,omitempty"`
-
- // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int32 `json:"retryDelay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int32 `json:"timeout,omitempty"`
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int32) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
-// RetryDuration returns retry delay duration in a form of time.Duration.
-func (o *Options) RetryDuration() time.Duration {
- return time.Second * time.Duration(o.RetryDelay)
+ Timeout int64 `json:"timeout,omitempty"`
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -98,8 +70,8 @@ func (j *Item) ID() string {
return j.Ident
}
-func (j *Item) Priority() uint64 {
- return uint64(j.Options.Priority)
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
}
// Body packs job payload into binary payload.
@@ -121,9 +93,9 @@ func (j *Item) Nack() error {
return j.NackFunc(false, false)
}
-func fromDelivery(d amqp.Delivery) (*Item, error) {
+func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- item, err := unpack(d)
+ item, err := j.unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
@@ -138,18 +110,16 @@ func fromDelivery(d amqp.Delivery) (*Item, error) {
}, nil
}
-func fromJob(job *structs.Job) *Item {
+func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
Options: &Options{
- Priority: uint32(job.Options.Priority),
- Pipeline: job.Options.Pipeline,
- Delay: int32(job.Options.Delay),
- Attempts: int32(job.Options.Attempts),
- RetryDelay: int32(job.Options.RetryDelay),
- Timeout: int32(job.Options.Timeout),
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
},
}
}
@@ -161,54 +131,57 @@ func pack(id string, j *Item) (amqp.Table, error) {
return nil, err
}
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- rrPipeline: j.Options.Pipeline,
- rrHeaders: headers,
- rrTimeout: j.Options.Timeout,
- rrDelay: j.Options.Delay,
- rrRetryDelay: j.Options.RetryDelay,
+ job.RRID: id,
+ job.RRJob: j.Job,
+ job.RRPipeline: j.Options.Pipeline,
+ job.RRHeaders: headers,
+ job.RRTimeout: j.Options.Timeout,
+ job.RRDelay: j.Options.Delay,
+ job.RRPriority: j.Options.Priority,
}, nil
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (*Item, error) {
- j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
- if _, ok := d.Headers[rrID].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", rrID))
+ if _, ok := d.Headers[job.RRID].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
}
- j.Ident = d.Headers[rrID].(string)
+ item.Ident = d.Headers[job.RRID].(string)
- if _, ok := d.Headers[rrJob].(string); !ok {
- return nil, errors.E(errors.Errorf("missing header `%s`", rrJob))
+ if _, ok := d.Headers[job.RRJob].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
}
- j.Job = d.Headers[rrJob].(string)
+ item.Job = d.Headers[job.RRJob].(string)
- if _, ok := d.Headers[rrPipeline].(string); ok {
- j.Options.Pipeline = d.Headers[rrPipeline].(string)
+ if _, ok := d.Headers[job.RRPipeline].(string); ok {
+ item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
}
- if h, ok := d.Headers[rrHeaders].([]byte); ok {
- err := json.Unmarshal(h, &j.Headers)
+ if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &item.Headers)
if err != nil {
return nil, err
}
}
- if _, ok := d.Headers[rrTimeout].(int32); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(int32)
+ if _, ok := d.Headers[job.RRTimeout].(int64); ok {
+ item.Options.Timeout = d.Headers[job.RRTimeout].(int64)
}
- if _, ok := d.Headers[rrDelay].(int32); ok {
- j.Options.Delay = d.Headers[rrDelay].(int32)
+ if _, ok := d.Headers[job.RRDelay].(int64); ok {
+ item.Options.Delay = d.Headers[job.RRDelay].(int64)
}
- if _, ok := d.Headers[rrRetryDelay].(int32); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
+ if _, ok := d.Headers[job.RRPriority]; !ok {
+ // set pipe's priority
+ item.Options.Priority = j.priority
+ } else {
+ item.Options.Priority = d.Headers[job.RRPriority].(int64)
}
- return j, nil
+ return item, nil
}
diff --git a/plugins/jobs/brokers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
index 2b994fc5..7241c717 100644
--- a/plugins/jobs/brokers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -12,7 +12,7 @@ func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
return
}
- d, err := fromDelivery(msg)
+ d, err := j.fromDelivery(msg)
if err != nil {
j.log.Error("amqp delivery convert", "error", err)
continue
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
index 624f4405..624f4405 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/drivers/amqp/plugin.go
diff --git a/plugins/jobs/brokers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
index e3e5f8da..d6b8a708 100644
--- a/plugins/jobs/brokers/amqp/rabbit_init.go
+++ b/plugins/jobs/drivers/amqp/rabbit_init.go
@@ -36,7 +36,7 @@ func (j *JobsConsumer) initRabbitMQ() error {
j.queue,
false,
false,
- true,
+ j.exclusive,
false,
nil,
)
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index 6ce69ed9..0b52a4d1 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -32,7 +32,6 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
Driver: pipe.Driver(),
Error: err,
Start: time.Now(),
- Elapsed: 0,
})
j.log.Error("connection closed, reconnecting", "error", err)
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 559cb2e9..45ee8083 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -8,8 +8,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -79,34 +79,35 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobBroker) Push(job *structs.Job) error {
+func (j *JobBroker) Push(jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.pipeline.Load(job.Options.Pipeline); ok {
+ if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
}
+ msg := fromJob(jb)
// handle timeouts
- if job.Options.Timeout > 0 {
- go func(jj *structs.Job) {
+ if msg.Options.Timeout > 0 {
+ go func(jj *job.Job) {
time.Sleep(jj.Options.TimeoutDuration())
// send the item after timeout expired
- j.localQueue <- From(job)
- }(job)
+ j.localQueue <- msg
+ }(jb)
return nil
}
// insert to the local, limited pipeline
- j.localQueue <- From(job)
+ j.localQueue <- msg
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
func (j *JobBroker) consume() {
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 211ac56a..442533c5 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -4,11 +4,11 @@ import (
"time"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
-func From(job *structs.Job) *Item {
+func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
Ident: job.Ident,
@@ -27,13 +27,13 @@ type Item struct {
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
@@ -43,16 +43,16 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -73,7 +73,7 @@ func (j *Item) ID() string {
return j.Ident
}
-func (j *Item) Priority() uint64 {
+func (j *Item) Priority() int64 {
return j.Options.Priority
}
@@ -89,7 +89,7 @@ func (j *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout uint64 `json:"timeout"`
+ Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
}{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
)
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go
index 28495abb..28495abb 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/drivers/ephemeral/plugin.go
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
new file mode 100644
index 00000000..0b4e8157
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -0,0 +1,103 @@
+package sqs
+
+type GlobalCfg struct {
+ Key string `mapstructure:"key"`
+ Secret string `mapstructure:"secret"`
+ Region string `mapstructure:"region"`
+ SessionToken string `mapstructure:"session_token"`
+ Endpoint string `mapstructure:"endpoint"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ // The duration (in seconds) that the received messages are hidden from subsequent
+ // retrieve requests after being retrieved by a ReceiveMessage request.
+ VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
+ // The duration (in seconds) for which the call waits for a message to arrive
+ // in the queue before returning. If a message is available, the call returns
+ // sooner than WaitTimeSeconds. If no messages are available and the wait time
+ // expires, the call returns successfully with an empty list of messages.
+ WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
+ // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages
+ // than this value (however, fewer messages might be returned). Valid values: 1 to
+ // 10. Default: 1.
+ PrefetchCount int32 `mapstructure:"pipeline_size"`
+ // The name of the new queue. The following limits apply to this name:
+ //
+ // * A queue
+ // name can have up to 80 characters.
+ //
+ // * Valid values: alphanumeric characters,
+ // hyphens (-), and underscores (_).
+ //
+ // * A FIFO queue name must end with the .fifo
+ // suffix.
+ //
+ // Queue URLs and names are case-sensitive.
+ //
+ // This member is required.
+ Queue string `mapstructure:"queue"`
+
+ // A map of attributes with their corresponding values. The following lists the
+ // names, descriptions, and values of the special request parameters that the
+ // CreateQueue action uses.
+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+ Attributes map[string]string `mapstructure:"attributes"`
+
+ // From amazon docs:
+ // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
+ // Tagging Your Amazon SQS Queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
+ // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
+ // guidelines in mind:
+ //
+ // * Adding more than 50 tags to a queue isn't recommended.
+ //
+ // *
+ // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
+ // strings.
+ //
+ // * Tags are case-sensitive.
+ //
+ // * A new tag with a key identical to that
+ // of an existing tag overwrites the existing tag.
+ //
+ // For a full list of tag
+ // restrictions, see Quotas related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
+ // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
+ // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
+ // permissions don't apply to this action. For more information, see Grant
+ // cross-account permissions to a role and a user name
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
+ // in the Amazon SQS Developer Guide.
+ Tags map[string]string `mapstructure:"tags"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Endpoint == "" {
+ c.Endpoint = "http://localhost:9324"
+ }
+}
+
+func (c *Config) InitDefault() {
+ if c.Queue == "" {
+ c.Queue = "default"
+ }
+
+ if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
+ c.PrefetchCount = 10
+ }
+
+ if c.WaitTimeSeconds == 0 {
+ c.WaitTimeSeconds = 5
+ }
+
+ if c.Attributes == nil {
+ c.Attributes = make(map[string]string)
+ }
+
+ if c.Tags == nil {
+ c.Tags = make(map[string]string)
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
new file mode 100644
index 00000000..c0f66589
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -0,0 +1,190 @@
+package sqs
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ pq priorityqueue.Queue
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+
+ // connection info
+ key string
+ secret string
+ sessionToken string
+ region string
+ endpoint string
+ queue string
+ messageGroupID string
+ waitTime int32
+ prefetch int32
+ visibilityTimeout int32
+
+ // queue optional parameters
+ attributes map[string]string
+ tags map[string]string
+
+ client *sqs.Client
+ outputQ *sqs.CreateQueueOutput
+
+ pauseCh chan struct{}
+}
+
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: pipeCfg.Attributes,
+ tags: pipeCfg.Tags,
+ queue: pipeCfg.Queue,
+ prefetch: pipeCfg.PrefetchCount,
+ visibilityTimeout: pipeCfg.VisibilityTimeout,
+ waitTime: pipeCfg.WaitTimeSeconds,
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return jb, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ return &JobConsumer{}, nil
+}
+
+func (j *JobConsumer) Push(jb *job.Job) error {
+ const op = errors.Op("sqs_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // The length of time, in seconds, for which to delay a specific message. Valid
+ // values: 0 to 900. Maximum: 15 minutes.
+ if jb.Options.Delay > 900 {
+ return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
+ }
+
+ msg := fromJob(jb)
+
+ // The new value for the message's visibility timeout (in seconds). Values range: 0
+ // to 43200. Maximum: 12 hours.
+ _, err := j.client.SendMessage(context.Background(), j.pack(msg))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ j.pipeline.Store(pipeline)
+ return nil
+}
+
+func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ j.Lock()
+ defer j.Unlock()
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // start listener
+ go j.listen()
+
+ return nil
+}
+
+func (j *JobConsumer) Stop() error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
new file mode 100644
index 00000000..ef736be9
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -0,0 +1,192 @@
+package sqs
+
+import (
+ "strconv"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ StringType string = "String"
+ NumberType string = "Number"
+ ApproximateReceiveCount string = "ApproximateReceiveCount"
+)
+
+var attributes = []string{
+ job.RRJob,
+ job.RRDelay,
+ job.RRTimeout,
+ job.RRPriority,
+ job.RRMaxAttempts,
+}
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+
+ // Maximum number of attempts to receive and process the message
+ MaxAttempts int64 `json:"max_attempts,omitempty"`
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.MaxAttempts > (attempt + 1)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (j *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (j *Item) Ack() error {
+ return nil
+}
+
+func (j *Item) Nack() error {
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ MaxAttempts: job.Options.Attempts,
+ },
+ }
+}
+
+func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ MessageBody: aws.String(item.Payload),
+ QueueUrl: j.outputQ.QueueUrl,
+ DelaySeconds: int32(item.Options.Delay),
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))},
+ },
+ }
+}
+
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
+ const op = errors.Op("sqs_unpack")
+ // reserved
+ if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
+ return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ }
+
+ for i := 0; i < len(attributes); i++ {
+ if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
+ return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ }
+ }
+
+ attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ item := &Item{
+ Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Payload: *msg.Body,
+ Options: &Options{
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
+ MaxAttempts: int64(attempt),
+ },
+ }
+
+ return item, recCount, nil
+}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
new file mode 100644
index 00000000..a10ce5a6
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -0,0 +1,66 @@
+package sqs
+
+import (
+ "context"
+
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+)
+
+const (
+ All string = "All"
+)
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.pauseCh:
+ return
+ default:
+ message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ MaxNumberOfMessages: j.prefetch,
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
+ MessageAttributeNames: []string{All},
+ VisibilityTimeout: j.visibilityTimeout,
+ WaitTimeSeconds: j.waitTime,
+ })
+ if err != nil {
+ j.log.Error("receive message", "error", err)
+ continue
+ }
+
+ for i := 0; i < len(message.Messages); i++ {
+ m := message.Messages[i]
+ item, attempt, err := j.unpack(&m)
+ if err != nil {
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+
+ j.log.Error("message unpack", "error", err)
+ continue
+ }
+
+ if item.Options.CanRetry(int64(attempt)) {
+ j.pq.Insert(item)
+ continue
+ }
+
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
new file mode 100644
index 00000000..54f61ff5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/plugin.go
@@ -0,0 +1,39 @@
+package sqs
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "sqs"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/structs/general.go b/plugins/jobs/job/general.go
index ae754286..2c7d04f0 100644
--- a/plugins/jobs/structs/general.go
+++ b/plugins/jobs/job/general.go
@@ -1,4 +1,16 @@
-package structs
+package job
+
+// constant keys to pack/unpack messages from different drivers
+const (
+ RRID string = "rr_id"
+ RRJob string = "rr_job"
+ RRHeaders string = "rr_headers"
+ RRPipeline string = "rr_pipeline"
+ RRTimeout string = "rr_timeout"
+ RRDelay string = "rr_delay"
+ RRPriority string = "rr_priority"
+ RRMaxAttempts string = "rr_max_attempts"
+)
// Job carries information about single job.
type Job struct {
@@ -6,13 +18,13 @@ type Job struct {
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
- Ident string
+ Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-value pairs
- Headers map[string][]string
+ Headers map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/job/job_options.go
index d48e2c56..af971d15 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/job/job_options.go
@@ -1,4 +1,4 @@
-package structs
+package job
import "time"
@@ -6,23 +6,23 @@ import "time"
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
+ Attempts int64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
+ RetryDelay int64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ Timeout int64 `json:"timeout,omitempty"`
}
// Merge merges job options.
@@ -49,7 +49,7 @@ func (o *Options) Merge(from *Options) {
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
+func (o *Options) CanRetry(attempt int64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/job/job_options_test.go
index a16f7dd0..500d8006 100644
--- a/plugins/jobs/structs/job_options_test.go
+++ b/plugins/jobs/job/job_options_test.go
@@ -1,4 +1,4 @@
-package structs
+package job
import (
"testing"
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index 91898178..90eeb189 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -55,10 +55,21 @@ func (p Pipeline) Int(name string, d int) int {
return d
}
+// Bool must return option value as bool or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(bool); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
// Priority returns default pipeline priority
-func (p Pipeline) Priority() uint64 {
+func (p Pipeline) Priority() int64 {
if value, ok := p[priority]; ok {
- if v, ok := value.(uint64); ok {
+ if v, ok := value.(int64); ok {
return v
}
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index c83078c3..ce51df21 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,7 +3,9 @@ package jobs
import (
"context"
"fmt"
+ "runtime"
"sync"
+ "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -14,8 +16,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -172,6 +174,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return errCh
}
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
+ fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
+ fmt.Printf("---> curr len: %d\n", p.queue.Len())
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
// start listening
go func() {
for i := uint8(0); i < p.cfg.NumPollers; i++ {
@@ -219,6 +238,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
}
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
}
}
}()
@@ -289,7 +310,7 @@ func (p *Plugin) Reset() error {
return nil
}
-func (p *Plugin) Push(j *structs.Job) error {
+func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
// get the pipeline for the job
@@ -320,7 +341,7 @@ func (p *Plugin) Push(j *structs.Job) error {
return nil
}
-func (p *Plugin) PushBatch(j []*structs.Job) error {
+func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 0bb94fa4..a2bd9c6d 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -2,7 +2,7 @@ package jobs
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/logger"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
@@ -49,7 +49,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
l := len(j.GetJobs())
- batch := make([]*structs.Job, l)
+ batch := make([]*job.Job, l)
for i := 0; i < l; i++ {
// convert transport entity into domain
@@ -93,19 +93,19 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *structs.Job {
+func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()
}
- jb := &structs.Job{
+ jb := &job.Job{
Job: j.GetJob(),
Headers: headers,
Ident: j.GetId(),
Payload: j.GetPayload(),
- Options: &structs.Options{
+ Options: &job.Options{
Priority: j.GetOptions().GetPriority(),
Pipeline: j.GetOptions().GetPipeline(),
Delay: j.GetOptions().GetDelay(),
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index 38711806..b445ca3f 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -382,12 +382,12 @@ type Options struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"`
+ Priority int64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"`
Pipeline string `protobuf:"bytes,2,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
- Delay uint64 `protobuf:"varint,3,opt,name=delay,proto3" json:"delay,omitempty"`
- Attempts uint64 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"`
- RetryDelay uint64 `protobuf:"varint,5,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"`
- Timeout uint64 `protobuf:"varint,6,opt,name=timeout,proto3" json:"timeout,omitempty"`
+ Delay int64 `protobuf:"varint,3,opt,name=delay,proto3" json:"delay,omitempty"`
+ Attempts int64 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"`
+ RetryDelay int64 `protobuf:"varint,5,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"`
+ Timeout int64 `protobuf:"varint,6,opt,name=timeout,proto3" json:"timeout,omitempty"`
}
func (x *Options) Reset() {
@@ -422,7 +422,7 @@ func (*Options) Descriptor() ([]byte, []int) {
return file_jobs_proto_rawDescGZIP(), []int{7}
}
-func (x *Options) GetPriority() uint64 {
+func (x *Options) GetPriority() int64 {
if x != nil {
return x.Priority
}
@@ -436,28 +436,28 @@ func (x *Options) GetPipeline() string {
return ""
}
-func (x *Options) GetDelay() uint64 {
+func (x *Options) GetDelay() int64 {
if x != nil {
return x.Delay
}
return 0
}
-func (x *Options) GetAttempts() uint64 {
+func (x *Options) GetAttempts() int64 {
if x != nil {
return x.Attempts
}
return 0
}
-func (x *Options) GetRetryDelay() uint64 {
+func (x *Options) GetRetryDelay() int64 {
if x != nil {
return x.RetryDelay
}
return 0
}
-func (x *Options) GetTimeout() uint64 {
+func (x *Options) GetTimeout() int64 {
if x != nil {
return x.Timeout
}
@@ -501,16 +501,16 @@ var file_jobs_proto_rawDesc = []byte{
0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a,
- 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52,
+ 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70,
0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70,
0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x03,
- 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61,
- 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61,
+ 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61,
+ 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x61,
0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79,
- 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65,
+ 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
- 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
+ 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62,
0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 9ff967d4..1bcddf4f 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -39,11 +39,11 @@ message HeaderValue {
}
message Options {
- uint64 priority = 1;
+ int64 priority = 1;
string pipeline = 2;
- uint64 delay = 3;
- uint64 attempts = 4;
- uint64 retry_delay = 5;
- uint64 timeout = 6;
+ int64 delay = 3;
+ int64 attempts = 4;
+ int64 retry_delay = 5;
+ int64 timeout = 6;
}
diff --git a/tests/env/docker-compose-jobs.yml b/tests/env/docker-compose-jobs.yml
deleted file mode 100644
index 7b88c9cf..00000000
--- a/tests/env/docker-compose-jobs.yml
+++ /dev/null
@@ -1,22 +0,0 @@
-version: "3"
-
-services:
- beanstalk:
- image: schickling/beanstalkd
- ports:
- - "11300:11300"
-
- sqs:
- image: vsouza/sqs-local
- ports:
- - "9324:9324"
-
- rabbitmq:
- image: rabbitmq:3-management
- environment:
- RABBITMQ_DEFAULT_USER: guest
- RABBITMQ_DEFAULT_PASS: guest
- RABBITMQ_DEFAULT_VHOST: /
- ports:
- - "15672:15672"
- - "5672:5672" \ No newline at end of file
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 80826acc..63ddc70d 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -19,8 +19,6 @@ sqs:
secret: api-secret
region: us-west-1
endpoint: http://localhost:9324
- declare:
- MessageRetentionPeriod: 86400
logs:
level: info
@@ -62,6 +60,7 @@ jobs:
pipeline_size: 1000000
queue: test-1-queue
exchange: default
+ exclusive: true
exchange_type: direct
routing_key: test
@@ -81,12 +80,14 @@ jobs:
pipeline_size: 1000000
test-3:
- # priority: 11 - not defined, 10 by default
- # driver locality not specified, local by default
driver: sqs
pipeline_size: 1000000
queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
new file mode 100644
index 00000000..ee72c2b7
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -0,0 +1,105 @@
+rpc:
+ listen: unix:///tmp/rr.sock
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+ session_token: ""
+ ping_period: 10
+ attributes:
+ MessageRetentionPeriod: 86400
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 64
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-4:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
+
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 034ffc45..c06b74e4 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -16,8 +16,9 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/amqp"
- "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -28,6 +29,79 @@ import (
"github.com/stretchr/testify/require"
)
+func TestTEMP_INTI(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-test.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ &sqs.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 30000)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func TestJobsInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)