summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 14:28:05 +0300
committerValery Piashchynski <[email protected]>2021-07-08 14:28:05 +0300
commit05956485a121ee47a2f8281a8a0dffd7eecc68aa (patch)
tree1fd0ffd60c1731ea6934300506c3061f6e65d1d7
parentc7becb2fc51fc09523f6640eb72f360a6b4681f5 (diff)
Add pipeline and job plugin options...
Skeleton for the amqp plugin. Add Timeout and Pipeline to the job.Context() method. Implement queue limits for the ephemeral driver with main priority queue limits. Update configuration, add pipeline_size for every pipeline and jobs priority queue size. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/priorityqueue/binary_heap.go6
-rw-r--r--pkg/priorityqueue/binary_heap_test.go17
-rw-r--r--plugins/jobs/brokers/amqp/config.go1
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go48
-rw-r--r--plugins/jobs/brokers/amqp/item.go130
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go30
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go48
-rw-r--r--plugins/jobs/brokers/ephemeral/item.go10
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go9
-rw-r--r--plugins/jobs/config.go8
-rw-r--r--plugins/jobs/plugin.go8
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml12
12 files changed, 299 insertions, 28 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index 82424331..e47dd2c8 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -18,12 +18,8 @@ type BinHeap struct {
}
func NewBinHeap(maxLen uint64) *BinHeap {
- if maxLen == 0 {
- maxLen = 100_000
- }
-
return &BinHeap{
- items: make([]Item, 0, maxLen),
+ items: make([]Item, 0, 1000),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index 125884b1..53505f52 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -37,7 +37,7 @@ func (t Test) Priority() uint64 {
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
- bh := NewBinHeap(100)
+ bh := NewBinHeap(0)
for i := 0; i < len(a); i++ {
bh.Insert(a[i])
@@ -59,7 +59,19 @@ func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewBinHeap(10000000)
+ pq := NewBinHeap(1000)
+
+ go func() {
+ tt3 := time.NewTicker(time.Millisecond * 10)
+ for {
+ select {
+ case <-tt3.C:
+ require.Less(t, pq.Len(), uint64(1002))
+ case <-stopCh:
+ return
+ }
+ }
+ }()
go func() {
tt := time.NewTicker(time.Second)
@@ -106,4 +118,5 @@ func TestNewPriorityQueue(t *testing.T) {
stopCh <- struct{}{}
stopCh <- struct{}{}
stopCh <- struct{}{}
+ stopCh <- struct{}{}
}
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
deleted file mode 100644
index 0e8d02ac..00000000
--- a/plugins/jobs/brokers/amqp/config.go
+++ /dev/null
@@ -1 +0,0 @@
-package amqp
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 0e8d02ac..0b8a5a5b 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -1 +1,49 @@
package amqp
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Config struct {
+ Addr string
+ Queue string
+}
+
+type JobsConsumer struct {
+ log logger.Logger
+ pq priorityqueue.Queue
+}
+
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ }
+
+ return jb, nil
+}
+
+func (j JobsConsumer) Push(job *structs.Job) error {
+ panic("implement me")
+}
+
+func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (j JobsConsumer) List() []*pipeline.Pipeline {
+ panic("implement me")
+}
+
+func (j JobsConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j JobsConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
new file mode 100644
index 00000000..ddb4e291
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -0,0 +1,130 @@
+package amqp
+
+import (
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+func From(job *structs.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: conv(*job.Options),
+ }
+}
+
+func conv(jo structs.Options) Options {
+ return Options(jo)
+}
+
+type Item struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options Options `json:"options,omitempty"`
+
+ AckFunc func()
+
+ NackFunc func()
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority uint64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay uint64 `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts uint64 `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay uint64 `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout uint64 `json:"timeout,omitempty"`
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt uint64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() uint64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout uint64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (j *Item) Ack() {
+ // noop for the in-memory
+}
+
+func (j *Item) Nack() {
+ // noop for the in-memory
+}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 0e8d02ac..174cb006 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -1 +1,31 @@
package amqp
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ name string = "amqp"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return name
+}
+
+func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
+}
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index e31e3b25..5cf4c633 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -6,21 +6,47 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
+type Config struct {
+ PipelineSize uint64 `mapstructure:"pipeline_size"`
+}
+
type JobBroker struct {
- queues sync.Map
- pq priorityqueue.Queue
+ cfg *Config
+ log logger.Logger
+ queues sync.Map
+ pq priorityqueue.Queue
+ localQueue chan *Item
}
-func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) {
+ const op = errors.Op("new_ephemeral_pipeline")
+
jb := &JobBroker{
- queues: sync.Map{},
- pq: q,
+ log: log,
+ pq: q,
+ }
+
+ err := cfg.UnmarshalKey(configKey, &jb.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if jb.cfg.PipelineSize == 0 {
+ jb.cfg.PipelineSize = 100_000
}
+ // initialize a local queue
+ jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+
+ // consume from the queue
+ go jb.consume()
+
return jb, nil
}
@@ -39,13 +65,14 @@ func (j *JobBroker) Push(job *structs.Job) error {
time.Sleep(jj.Options.TimeoutDuration())
// send the item after timeout expired
- j.pq.Insert(From(job))
+ j.localQueue <- From(job)
}(job)
return nil
}
- j.pq.Insert(From(job))
+ // insert to the local, limited pipeline
+ j.localQueue <- From(job)
return nil
}
@@ -53,6 +80,13 @@ func (j *JobBroker) Push(job *structs.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
+func (j *JobBroker) consume() {
+ // redirect
+ for item := range j.localQueue {
+ j.pq.Insert(item)
+ }
+}
+
func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
const op = errors.Op("ephemeral_register")
if _, ok := j.queues.Load(pipeline.Name()); ok {
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go
index e2caa53a..40c6b3e4 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/brokers/ephemeral/item.go
@@ -103,10 +103,12 @@ func (j *Item) Body() []byte {
func (j *Item) Context() ([]byte, error) {
ctx, err := json.Marshal(
struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers},
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout uint64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
)
if err != nil {
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 3d6a95b7..60c6e245 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -3,6 +3,7 @@ package ephemeral
import (
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -12,10 +13,12 @@ const (
type Plugin struct {
log logger.Logger
+ cfg config.Configurer
}
-func (p *Plugin) Init(log logger.Logger) error {
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
+ p.cfg = cfg
return nil
}
@@ -23,6 +26,6 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(q)
+func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(configKey, p.log, p.cfg, q)
}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index aa2da2dc..1b613231 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -19,6 +19,10 @@ type Config struct {
// Default - num logical cores
NumPollers uint8 `mapstructure:"num_pollers"`
+ // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
+ // Driver pipeline might be much larger than a main jobs queue
+ PipelineSize uint64 `mapstructure:"pipeline_size"`
+
// Pool configures roadrunner workers pool.
Pool *poolImpl.Config `mapstructure:"Pool"`
@@ -34,6 +38,10 @@ func (c *Config) InitDefaults() {
c.Pool = &poolImpl.Config{}
}
+ if c.PipelineSize == 0 {
+ c.PipelineSize = 1_000_000
+ }
+
if c.NumPollers == 0 {
c.NumPollers = uint8(runtime.NumCPU())
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 0f645b12..9a551d71 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -85,7 +85,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap(100_000_000)
+ p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
p.log = log
return nil
@@ -102,7 +102,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for { //nolint:gosimple
select {
case <-tt.C:
- fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate))
+ fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
atomic.StoreUint64(&rate, 0)
}
}
@@ -113,7 +113,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for { //nolint:gosimple
select {
case <-tt.C:
- fmt.Printf("---> goroutines: %d", runtime.NumGoroutine())
+ fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
}
}
}()
@@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for { //nolint:gosimple
select {
case <-tt.C:
- fmt.Printf("---> curr len: %d", p.queue.Len())
+ fmt.Printf("---> curr len: %d\n", p.queue.Len())
}
}
}()
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index bb6c477a..90590ccb 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -19,10 +19,15 @@ sqs:
secret: api-secret
region: us-west-1
endpoint: http://localhost:9324
+ declare:
+ MessageRetentionPeriod: 86400
jobs:
+ # num logical cores by default
num_pollers: 64
+ # 1mi by default
+ pipeline_size: 100000
# worker pool configuration
pool:
num_workers: 10
@@ -35,23 +40,26 @@ jobs:
test-local:
driver: ephemeral
priority: 10
+ pipeline_size: 10
test-1:
driver: amqp
priority: 1
queue: default
+ pipeline_size: 1000000
test-2:
driver: beanstalk
priority: 11
tube: default
+ pipeline_size: 1000000
test-3:
# priority: 11 - not defined, 10 by default
+ # driver locality not specified, local by default
driver: sqs
+ pipeline_size: 1000000
queue: default
- declare:
- MessageRetentionPeriod: 86400
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
consume: [ "test-local" ]