diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 16 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go | 18 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 16 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 68 | ||||
-rw-r--r-- | plugins/sqs/consumer.go | 16 |
5 files changed, 88 insertions, 46 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1bfc4b41..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,11 +420,13 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { + start := time.Now() c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -428,7 +435,8 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index dc2a7e91..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -266,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -282,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -328,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -357,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 46d596fa..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index 94abaadb..c2cc303b 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 + jb.cfg.Prefetch = 100 } // initialize a local queue @@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil + return &consumer{ + log: log, + pq: pq, + eh: eh, + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}), + }, nil } func (c *consumer) Push(ctx context.Context, jb *job.Job) error { @@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) { c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) } @@ -186,17 +185,26 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} + select { + case c.stopCh <- struct{}{}: + default: + break + } + + for i := 0; i < len(c.localPrefetch); i++ { + // drain all jobs from the channel + <-c.localPrefetch } c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -219,10 +227,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { time.Sleep(jj.Options.DelayDuration()) - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) + select { + case c.localPrefetch <- jj: + atomic.AddUint64(&c.goroutines, ^uint64(0)) + default: + c.log.Warn("can't push job", "error", "local queue closed or full") + } }(msg) return nil @@ -247,7 +257,7 @@ func (c *consumer) consume() { select { case item, ok := <-c.localPrefetch: if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue closed") return } diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index dfbda154..92dbd6a8 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("sqs_run") c.Lock() @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.pauseCh <- struct{}{} } @@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |