diff options
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 6 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 17 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 48 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 130 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 30 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 48 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 10 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 | ||||
-rw-r--r-- | plugins/jobs/config.go | 8 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 8 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 12 |
12 files changed, 299 insertions, 28 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index 82424331..e47dd2c8 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -18,12 +18,8 @@ type BinHeap struct { } func NewBinHeap(maxLen uint64) *BinHeap { - if maxLen == 0 { - maxLen = 100_000 - } - return &BinHeap{ - items: make([]Item, 0, maxLen), + items: make([]Item, 0, 1000), len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 125884b1..53505f52 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -37,7 +37,7 @@ func (t Test) Priority() uint64 { func TestBinHeap_Init(t *testing.T) { a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - bh := NewBinHeap(100) + bh := NewBinHeap(0) for i := 0; i < len(a); i++ { bh.Insert(a[i]) @@ -59,7 +59,19 @@ func TestNewPriorityQueue(t *testing.T) { insertsPerSec := uint64(0) getPerSec := uint64(0) stopCh := make(chan struct{}, 1) - pq := NewBinHeap(10000000) + pq := NewBinHeap(1000) + + go func() { + tt3 := time.NewTicker(time.Millisecond * 10) + for { + select { + case <-tt3.C: + require.Less(t, pq.Len(), uint64(1002)) + case <-stopCh: + return + } + } + }() go func() { tt := time.NewTicker(time.Second) @@ -106,4 +118,5 @@ func TestNewPriorityQueue(t *testing.T) { stopCh <- struct{}{} stopCh <- struct{}{} stopCh <- struct{}{} + stopCh <- struct{}{} } diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go deleted file mode 100644 index 0e8d02ac..00000000 --- a/plugins/jobs/brokers/amqp/config.go +++ /dev/null @@ -1 +0,0 @@ -package amqp diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 0e8d02ac..0b8a5a5b 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1 +1,49 @@ package amqp + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Config struct { + Addr string + Queue string +} + +type JobsConsumer struct { + log logger.Logger + pq priorityqueue.Queue +} + +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + jb := &JobsConsumer{ + log: log, + pq: pq, + } + + return jb, nil +} + +func (j JobsConsumer) Push(job *structs.Job) error { + panic("implement me") +} + +func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error { + panic("implement me") +} + +func (j JobsConsumer) List() []*pipeline.Pipeline { + panic("implement me") +} + +func (j JobsConsumer) Pause(pipeline string) { + panic("implement me") +} + +func (j JobsConsumer) Resume(pipeline string) { + panic("implement me") +} diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go new file mode 100644 index 00000000..ddb4e291 --- /dev/null +++ b/plugins/jobs/brokers/amqp/item.go @@ -0,0 +1,130 @@ +package amqp + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" +) + +func From(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: conv(*job.Options), + } +} + +func conv(jo structs.Options) Options { + return Options(jo) +} + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options Options `json:"options,omitempty"` + + AckFunc func() + + NackFunc func() +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority uint64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay uint64 `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts uint64 `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay uint64 `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout uint64 `json:"timeout,omitempty"` +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt uint64) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() uint64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout uint64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + ) + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (j *Item) Ack() { + // noop for the in-memory +} + +func (j *Item) Nack() { + // noop for the in-memory +} diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 0e8d02ac..174cb006 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1 +1,31 @@ package amqp + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + name string = "amqp" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return name +} + +func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +} diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index e31e3b25..5cf4c633 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -6,21 +6,47 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" ) +type Config struct { + PipelineSize uint64 `mapstructure:"pipeline_size"` +} + type JobBroker struct { - queues sync.Map - pq priorityqueue.Queue + cfg *Config + log logger.Logger + queues sync.Map + pq priorityqueue.Queue + localQueue chan *Item } -func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { + const op = errors.Op("new_ephemeral_pipeline") + jb := &JobBroker{ - queues: sync.Map{}, - pq: q, + log: log, + pq: q, + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.PipelineSize == 0 { + jb.cfg.PipelineSize = 100_000 } + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + return jb, nil } @@ -39,13 +65,14 @@ func (j *JobBroker) Push(job *structs.Job) error { time.Sleep(jj.Options.TimeoutDuration()) // send the item after timeout expired - j.pq.Insert(From(job)) + j.localQueue <- From(job) }(job) return nil } - j.pq.Insert(From(job)) + // insert to the local, limited pipeline + j.localQueue <- From(job) return nil } @@ -53,6 +80,13 @@ func (j *JobBroker) Push(job *structs.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } +func (j *JobBroker) consume() { + // redirect + for item := range j.localQueue { + j.pq.Insert(item) + } +} + func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.queues.Load(pipeline.Name()); ok { diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index e2caa53a..40c6b3e4 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -103,10 +103,12 @@ func (j *Item) Body() []byte { func (j *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout uint64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, ) if err != nil { diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 3d6a95b7..60c6e245 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -3,6 +3,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,10 +13,12 @@ const ( type Plugin struct { log logger.Logger + cfg config.Configurer } -func (p *Plugin) Init(log logger.Logger) error { +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log + p.cfg = cfg return nil } @@ -23,6 +26,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(q) +func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, q) } diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index aa2da2dc..1b613231 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -19,6 +19,10 @@ type Config struct { // Default - num logical cores NumPollers uint8 `mapstructure:"num_pollers"` + // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline + // Driver pipeline might be much larger than a main jobs queue + PipelineSize uint64 `mapstructure:"pipeline_size"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` @@ -34,6 +38,10 @@ func (c *Config) InitDefaults() { c.Pool = &poolImpl.Config{} } + if c.PipelineSize == 0 { + c.PipelineSize = 1_000_000 + } + if c.NumPollers == 0 { c.NumPollers = uint8(runtime.NumCPU()) } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 0f645b12..9a551d71 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -85,7 +85,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap(100_000_000) + p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log return nil @@ -102,7 +102,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate)) + fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) atomic.StoreUint64(&rate, 0) } } @@ -113,7 +113,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> goroutines: %d", runtime.NumGoroutine()) + fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) } } }() @@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> curr len: %d", p.queue.Len()) + fmt.Printf("---> curr len: %d\n", p.queue.Len()) } } }() diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bb6c477a..90590ccb 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -19,10 +19,15 @@ sqs: secret: api-secret region: us-west-1 endpoint: http://localhost:9324 + declare: + MessageRetentionPeriod: 86400 jobs: + # num logical cores by default num_pollers: 64 + # 1mi by default + pipeline_size: 100000 # worker pool configuration pool: num_workers: 10 @@ -35,23 +40,26 @@ jobs: test-local: driver: ephemeral priority: 10 + pipeline_size: 10 test-1: driver: amqp priority: 1 queue: default + pipeline_size: 1000000 test-2: driver: beanstalk priority: 11 tube: default + pipeline_size: 1000000 test-3: # priority: 11 - not defined, 10 by default + # driver locality not specified, local by default driver: sqs + pipeline_size: 1000000 queue: default - declare: - MessageRetentionPeriod: 86400 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually consume: [ "test-local" ] |