summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go16
-rw-r--r--plugins/beanstalk/consumer.go18
-rw-r--r--plugins/boltdb/boltjobs/consumer.go16
-rw-r--r--plugins/memory/memoryjobs/consumer.go68
-rw-r--r--plugins/sqs/consumer.go16
5 files changed, 88 insertions, 46 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 1bfc4b41..2ff0a40a 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("rabbit_run")
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -415,11 +420,13 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Stop(context.Context) error {
+ start := time.Now()
c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -428,7 +435,8 @@ func (c *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index dc2a7e91..30807f03 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -266,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
+ start := time.Now()
// load atomic value
+ // check if the pipeline registered
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
@@ -282,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Stop(context.Context) error {
+ start := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,13 +302,15 @@ func (j *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -328,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (j *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -357,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 46d596fa..62045d3b 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("boltdb_run")
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.stopCh <- struct{}{}
c.stopCh <- struct{}{}
@@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index 94abaadb..c2cc303b 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
goroutines: 0,
active: utils.Int64(0),
delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
if jb.cfg.Prefetch == 0 {
- jb.cfg.Prefetch = 100_000
+ jb.cfg.Prefetch = 100
}
// initialize a local queue
@@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- jb := &consumer{
- log: log,
- pq: pq,
- eh: eh,
- goroutines: 0,
- active: utils.Int64(0),
- delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
- }
-
- // initialize a local queue
- jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
-
- return jb, nil
+ return &consumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)),
+ goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
+ stopCh: make(chan struct{}),
+ }, nil
}
func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
@@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) {
c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
@@ -186,17 +185,26 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
+ select {
+ case c.stopCh <- struct{}{}:
+ default:
+ break
+ }
+
+ for i := 0; i < len(c.localPrefetch); i++ {
+ // drain all jobs from the channel
+ <-c.localPrefetch
}
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -219,10 +227,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
time.Sleep(jj.Options.DelayDuration())
- // send the item after timeout expired
- c.localPrefetch <- jj
-
- atomic.AddUint64(&c.goroutines, ^uint64(0))
+ select {
+ case c.localPrefetch <- jj:
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
+ default:
+ c.log.Warn("can't push job", "error", "local queue closed or full")
+ }
}(msg)
return nil
@@ -247,7 +257,7 @@ func (c *consumer) consume() {
select {
case item, ok := <-c.localPrefetch:
if !ok {
- c.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue closed")
return
}
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index dfbda154..92dbd6a8 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("sqs_run")
c.Lock()
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.pauseCh <- struct{}{}
}
@@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}