diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/memory/memoryjobs/consumer.go | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/memory/memoryjobs/consumer.go')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 296 |
1 files changed, 0 insertions, 296 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go deleted file mode 100644 index 79246063..00000000 --- a/plugins/memory/memoryjobs/consumer.go +++ /dev/null @@ -1,296 +0,0 @@ -package memoryjobs - -import ( - "context" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - prefetch string = "prefetch" - goroutinesMax uint64 = 1000 -) - -type Config struct { - Prefetch uint64 `mapstructure:"prefetch"` -} - -type consumer struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline atomic.Value - pq priorityqueue.Queue - localPrefetch chan *Item - - // time.sleep goroutines max number - goroutines uint64 - - delayed *int64 - active *int64 - - listeners uint32 - stopCh chan struct{} -} - -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_ephemeral_pipeline") - - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}), - } - - err := cfg.UnmarshalKey(configKey, &jb.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if jb.cfg == nil { - return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey)) - } - - if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - return &consumer{ - log: log, - pq: pq, - eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}), - }, nil -} - -func (c *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("ephemeral_push") - - // check if the pipeline registered - _, ok := c.pipeline.Load().(*pipeline.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) - } - - err := c.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: pipe.Name(), - Active: atomic.LoadInt64(c.active), - Delayed: atomic.LoadInt64(c.delayed), - Ready: ready(atomic.LoadUint32(&c.listeners)), - }, nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - const op = errors.Op("memory_jobs_run") - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return errors.E(op, errors.Str("listener already in the active state")) - } - - c.consume() - atomic.StoreUint32(&c.listeners, 1) - - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - // stop the consumer - c.stopCh <- struct{}{} - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return - } - - // resume the consumer on the same channel - c.consume() - - atomic.StoreUint32(&c.listeners, 1) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Stop(_ context.Context) error { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - select { - case c.stopCh <- struct{}{}: - default: - break - } - - for i := 0; i < len(c.localPrefetch); i++ { - // drain all jobs from the channel - <-c.localPrefetch - } - - c.localPrefetch = nil - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (c *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&c.goroutines, 1) - atomic.AddInt64(c.delayed, 1) - - time.Sleep(jj.Options.DelayDuration()) - - select { - case c.localPrefetch <- jj: - atomic.AddUint64(&c.goroutines, ^uint64(0)) - default: - c.log.Warn("can't push job", "error", "local queue closed or full") - } - }(msg) - - return nil - } - - // increase number of the active jobs - atomic.AddInt64(c.active, 1) - - // insert to the local, limited pipeline - select { - case c.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) - } -} - -func (c *consumer) consume() { - go func() { - // redirect - for { - select { - case item, ok := <-c.localPrefetch: - if !ok { - c.log.Warn("ephemeral local prefetch queue closed") - return - } - - // set requeue channel - item.Options.requeueFn = c.handleItem - item.Options.active = c.active - item.Options.delayed = c.delayed - - c.pq.Insert(item) - case <-c.stopCh: - return - } - } - }() -} - -func ready(r uint32) bool { - return r > 0 -} |