diff options
Diffstat (limited to 'plugins/ephemeral/consumer.go')
-rw-r--r-- | plugins/ephemeral/consumer.go | 119 |
1 files changed, 57 insertions, 62 deletions
diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go index 91b8eda9..8870bb0f 100644 --- a/plugins/ephemeral/consumer.go +++ b/plugins/ephemeral/consumer.go @@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - _, ok := j.pipeline.Load().(*pipeline.Pipeline) + _, ok := c.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } @@ -105,42 +105,42 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: pipe.Name(), - Active: atomic.LoadInt64(j.active), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil } -func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) return nil } -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop the consumer - j.stopCh <- struct{}{} + c.stopCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -149,24 +149,24 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // listener already active if l == 1 { - j.log.Warn("listener already in the active state") + c.log.Warn("listener already in the active state") return } // resume the consumer on the same channel - j.consume() + c.consume() - atomic.StoreUint32(&j.listeners, 1) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Start: time.Now(), @@ -175,8 +175,8 @@ func (j *consumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - j.eh.Push(events.JobEvent{ +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -185,84 +185,79 @@ func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *consumer) Stop(ctx context.Context) error { - const op = errors.Op("ephemeral_plugin_stop") +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - select { - // return from the consumer - case j.stopCh <- struct{}{}: - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } - return nil + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) - case <-ctx.Done(): - return errors.E(op, ctx.Err()) - } + return nil } -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) // goroutines here. We should limit goroutines here. if msg.Options.Delay > 0 { // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { return errors.E(op, errors.Str("max concurrency number reached")) } go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - atomic.AddInt64(j.delayed, 1) + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired - j.localPrefetch <- jj + c.localPrefetch <- jj - atomic.AddUint64(&j.goroutines, ^uint64(0)) + atomic.AddUint64(&c.goroutines, ^uint64(0)) }(msg) return nil } // increase number of the active jobs - atomic.AddInt64(j.active, 1) + atomic.AddInt64(c.active, 1) // insert to the local, limited pipeline select { - case j.localPrefetch <- msg: + case c.localPrefetch <- msg: return nil case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) } } -func (j *consumer) consume() { +func (c *consumer) consume() { go func() { // redirect for { select { - case item, ok := <-j.localPrefetch: + case item, ok := <-c.localPrefetch: if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue was closed") return } // set requeue channel - item.Options.requeueFn = j.handleItem - item.Options.active = j.active - item.Options.delayed = j.delayed + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed - j.pq.Insert(item) - case <-j.stopCh: + c.pq.Insert(item) + case <-c.stopCh: return } } |