summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
committerValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
commitd449d9d5aec1eec6d494064299feb1551f88ffe2 (patch)
treea905126b44bcfab29af9b5bc3eddaf5398375975 /plugins/jobs/drivers/ephemeral
parenta8a7f4194156440ef3157d8e5d75c43ed0327bcf (diff)
Add support for the jobs-worker protocol for the beanstalk,ephemeral and
sqs drivers Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go115
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go46
-rw-r--r--plugins/jobs/drivers/ephemeral/requeue.go25
3 files changed, 127 insertions, 59 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 043da118..050d74b9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -31,9 +31,11 @@ type JobConsumer struct {
pq priorityqueue.Queue
localPrefetch chan *Item
+ // time.sleep goroutines max number
goroutinesMaxNum uint64
- stopCh chan struct{}
+ requeueCh chan *Item
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// initialize a local queue
@@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
- msg := fromJob(jb)
- // handle timeouts
- // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
+ b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ }
- go func(jj *job.Job) {
- atomic.AddUint64(&j.goroutinesMaxNum, 1)
- time.Sleep(jj.Options.DelayDuration())
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
+ }
- // send the item after timeout expired
- j.localPrefetch <- msg
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
- atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
- }(jb)
+ return nil
+}
- return nil
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
+ return errors.E(op, errors.Str("max concurrency number reached"))
}
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- default:
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch))
- }
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutinesMaxNum, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
+ }(msg)
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
}
func (j *JobConsumer) consume() {
@@ -133,6 +148,9 @@ func (j *JobConsumer) consume() {
for {
select {
case item := <-j.localPrefetch:
+
+ // set requeue channel
+ item.Options.requeueCh = j.requeueCh
j.pq.Insert(item)
case <-j.stopCh:
return
@@ -140,7 +158,7 @@ func (j *JobConsumer) consume() {
}
}
-func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
const op = errors.Op("ephemeral_register")
if _, ok := j.pipeline.Load(pipeline.Name()); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
@@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline)
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == true {
// mark pipeline as turned off
j.pipeline.Store(pipeline, false)
}
+ // if not true - do not send the EventPipeStopped, because pipe already stopped
+ return
}
j.eh.Push(events.JobEvent{
@@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == false {
// mark pipeline as turned on
j.pipeline.Store(pipeline, true)
}
+
+ // if not true - do not send the EventPipeActive, because pipe already active
+ return
}
j.eh.Push(events.JobEvent{
@@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *JobConsumer) Stop(ctx context.Context) error {
+ const op = errors.Op("ephemeral_plugin_stop")
var pipe string
j.pipeline.Range(func(key, _ interface{}) bool {
pipe = key.(string)
@@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error {
return true
})
+ select {
// return from the consumer
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe,
- Start: time.Now(),
- Elapsed: 0,
- })
+ case j.stopCh <- struct{}{}:
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+ return nil
- return nil
+ case <-ctx.Done():
+ return errors.E(op, ctx.Err())
+ }
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 8560f10a..d140c9ed 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -8,20 +8,6 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- },
- }
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -53,6 +39,9 @@ type Options struct {
// Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -111,6 +100,33 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
+func (i *Item) Requeue(delay int64) error {
+ go func() {
+ time.Sleep(time.Second * time.Duration(delay))
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return
+ default:
+ // TODO(rustatian): logs?
+ return
+ }
+ }()
+
return nil
}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}
diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go
new file mode 100644
index 00000000..afb97d54
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/requeue.go
@@ -0,0 +1,25 @@
+package ephemeral
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ // TODO(rustatian): what timeout to use?
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}