summaryrefslogtreecommitdiff
path: root/plugins/ephemeral/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ephemeral/consumer.go')
-rw-r--r--plugins/ephemeral/consumer.go119
1 files changed, 57 insertions, 62 deletions
diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index 91b8eda9..8870bb0f 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
@@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- _, ok := j.pipeline.Load().(*pipeline.Pipeline)
+ _, ok := c.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
@@ -105,42 +105,42 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *consumer) State(_ context.Context) (*jobState.State, error) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: pipe.Name(),
- Active: atomic.LoadInt64(j.active),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Active: atomic.LoadInt64(c.active),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
}
-func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop the consumer
- j.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -149,24 +149,24 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// listener already active
if l == 1 {
- j.log.Warn("listener already in the active state")
+ c.log.Warn("listener already in the active state")
return
}
// resume the consumer on the same channel
- j.consume()
+ c.consume()
- atomic.StoreUint32(&j.listeners, 1)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -175,8 +175,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- j.eh.Push(events.JobEvent{
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -185,84 +185,79 @@ func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(ctx context.Context) error {
- const op = errors.Op("ephemeral_plugin_stop")
+func (c *consumer) Stop(_ context.Context) error {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- select {
- // return from the consumer
- case j.stopCh <- struct{}{}:
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
- return nil
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
- case <-ctx.Done():
- return errors.E(op, ctx.Err())
- }
+ return nil
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
// goroutines here. We should limit goroutines here.
if msg.Options.Delay > 0 {
// if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ if atomic.LoadUint64(&c.goroutines) >= goroutinesMax {
return errors.E(op, errors.Str("max concurrency number reached"))
}
go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddUint64(&c.goroutines, 1)
+ atomic.AddInt64(c.delayed, 1)
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localPrefetch <- jj
+ c.localPrefetch <- jj
- atomic.AddUint64(&j.goroutines, ^uint64(0))
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
}(msg)
return nil
}
// increase number of the active jobs
- atomic.AddInt64(j.active, 1)
+ atomic.AddInt64(c.active, 1)
// insert to the local, limited pipeline
select {
- case j.localPrefetch <- msg:
+ case c.localPrefetch <- msg:
return nil
case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err()))
}
}
-func (j *consumer) consume() {
+func (c *consumer) consume() {
go func() {
// redirect
for {
select {
- case item, ok := <-j.localPrefetch:
+ case item, ok := <-c.localPrefetch:
if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue was closed")
return
}
// set requeue channel
- item.Options.requeueFn = j.handleItem
- item.Options.active = j.active
- item.Options.delayed = j.delayed
+ item.Options.requeueFn = c.handleItem
+ item.Options.active = c.active
+ item.Options.delayed = c.delayed
- j.pq.Insert(item)
- case <-j.stopCh:
+ c.pq.Insert(item)
+ case <-c.stopCh:
return
}
}