summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go1
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go69
-rw-r--r--tests/plugins/jobs/jobs_ephemeral_test.go6
3 files changed, 47 insertions, 29 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index ab5aad14..32ce6ef7 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -180,7 +180,6 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// increase the ttr to 1. Maximum ttr is 2**32-1.
id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
- // TODO check for the connection error
errD := j.pool.Delete(id)
if errD != nil {
return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 2b0ff40b..71652066 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
@@ -14,20 +15,22 @@ import (
)
const (
- pipelineSize string = "pipeline_size"
+ prefetch string = "prefetch"
)
type Config struct {
- PipelineSize uint64 `mapstructure:"pipeline_size"`
+ Prefetch uint64 `mapstructure:"prefetch"`
}
type JobBroker struct {
- cfg *Config
- log logger.Logger
- eh events.Handler
- pipeline sync.Map
- pq priorityqueue.Queue
- localQueue chan *Item
+ cfg *Config
+ log logger.Logger
+ eh events.Handler
+ pipeline sync.Map
+ pq priorityqueue.Queue
+ localPrefetch chan *Item
+
+ goroutinesMaxNum uint64
stopCh chan struct{}
}
@@ -36,10 +39,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
const op = errors.Op("new_ephemeral_pipeline")
jb := &JobBroker{
- log: log,
- pq: pq,
- eh: eh,
- stopCh: make(chan struct{}, 1),
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutinesMaxNum: 1000,
+ stopCh: make(chan struct{}, 1),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -47,12 +51,12 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return nil, errors.E(op, err)
}
- if jb.cfg.PipelineSize == 0 {
- jb.cfg.PipelineSize = 100_000
+ if jb.cfg.Prefetch == 0 {
+ jb.cfg.Prefetch = 100_000
}
// initialize a local queue
- jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+ jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
// consume from the queue
go jb.consume()
@@ -62,14 +66,15 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- log: log,
- pq: pq,
- eh: eh,
- stopCh: make(chan struct{}, 1),
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutinesMaxNum: 1000,
+ stopCh: make(chan struct{}, 1),
}
// initialize a local queue
- jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000))
+ jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
// consume from the queue
go jb.consume()
@@ -88,19 +93,33 @@ func (j *JobBroker) Push(jb *job.Job) error {
msg := fromJob(jb)
// handle timeouts
- if msg.Options.Timeout > 0 {
+ // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
go func(jj *job.Job) {
- time.Sleep(jj.Options.TimeoutDuration())
+ atomic.AddUint64(&j.goroutinesMaxNum, 1)
+ time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localQueue <- msg
+ j.localPrefetch <- msg
+
+ atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
}(jb)
return nil
}
// insert to the local, limited pipeline
- j.localQueue <- msg
+ select {
+ case j.localPrefetch <- msg:
+ default:
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch))
+ }
return nil
}
@@ -112,7 +131,7 @@ func (j *JobBroker) consume() {
// redirect
for {
select {
- case item := <-j.localQueue:
+ case item := <-j.localPrefetch:
j.pq.Insert(item)
case <-j.stopCh:
return
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go
index e8974a59..37e25970 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_ephemeral_test.go
@@ -222,9 +222,9 @@ func declareEphemeralPipe(t *testing.T) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "ephemeral",
- "name": "test-3",
- "pipeline_size": "10000",
+ "driver": "ephemeral",
+ "name": "test-3",
+ "prefetch": "10000",
}}
er := &jobsv1beta.Empty{}