diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/memory | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/memory')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 296 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/item.go | 134 | ||||
-rw-r--r-- | plugins/memory/memorykv/config.go | 14 | ||||
-rw-r--r-- | plugins/memory/memorykv/kv.go | 257 | ||||
-rw-r--r-- | plugins/memory/memorypubsub/pubsub.go | 92 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 68 |
6 files changed, 0 insertions, 861 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go deleted file mode 100644 index 79246063..00000000 --- a/plugins/memory/memoryjobs/consumer.go +++ /dev/null @@ -1,296 +0,0 @@ -package memoryjobs - -import ( - "context" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - prefetch string = "prefetch" - goroutinesMax uint64 = 1000 -) - -type Config struct { - Prefetch uint64 `mapstructure:"prefetch"` -} - -type consumer struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline atomic.Value - pq priorityqueue.Queue - localPrefetch chan *Item - - // time.sleep goroutines max number - goroutines uint64 - - delayed *int64 - active *int64 - - listeners uint32 - stopCh chan struct{} -} - -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_ephemeral_pipeline") - - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}), - } - - err := cfg.UnmarshalKey(configKey, &jb.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if jb.cfg == nil { - return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey)) - } - - if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - return &consumer{ - log: log, - pq: pq, - eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}), - }, nil -} - -func (c *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("ephemeral_push") - - // check if the pipeline registered - _, ok := c.pipeline.Load().(*pipeline.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) - } - - err := c.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: pipe.Name(), - Active: atomic.LoadInt64(c.active), - Delayed: atomic.LoadInt64(c.delayed), - Ready: ready(atomic.LoadUint32(&c.listeners)), - }, nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - const op = errors.Op("memory_jobs_run") - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return errors.E(op, errors.Str("listener already in the active state")) - } - - c.consume() - atomic.StoreUint32(&c.listeners, 1) - - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - // stop the consumer - c.stopCh <- struct{}{} - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return - } - - // resume the consumer on the same channel - c.consume() - - atomic.StoreUint32(&c.listeners, 1) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (c *consumer) Stop(_ context.Context) error { - start := time.Now() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - select { - case c.stopCh <- struct{}{}: - default: - break - } - - for i := 0; i < len(c.localPrefetch); i++ { - // drain all jobs from the channel - <-c.localPrefetch - } - - c.localPrefetch = nil - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (c *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&c.goroutines, 1) - atomic.AddInt64(c.delayed, 1) - - time.Sleep(jj.Options.DelayDuration()) - - select { - case c.localPrefetch <- jj: - atomic.AddUint64(&c.goroutines, ^uint64(0)) - default: - c.log.Warn("can't push job", "error", "local queue closed or full") - } - }(msg) - - return nil - } - - // increase number of the active jobs - atomic.AddInt64(c.active, 1) - - // insert to the local, limited pipeline - select { - case c.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) - } -} - -func (c *consumer) consume() { - go func() { - // redirect - for { - select { - case item, ok := <-c.localPrefetch: - if !ok { - c.log.Warn("ephemeral local prefetch queue closed") - return - } - - // set requeue channel - item.Options.requeueFn = c.handleItem - item.Options.active = c.active - item.Options.delayed = c.delayed - - c.pq.Insert(item) - case <-c.stopCh: - return - } - } - }() -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go deleted file mode 100644 index f4d62ada..00000000 --- a/plugins/memory/memoryjobs/item.go +++ /dev/null @@ -1,134 +0,0 @@ -package memoryjobs - -import ( - "context" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - requeueFn func(context.Context, *Item) error - active *int64 - delayed *int64 -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Nack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - i.atomicallyReduceCount() - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - return nil -} - -// atomicallyReduceCount reduces counter of active or delayed jobs -func (i *Item) atomicallyReduceCount() { - // if job was delayed, reduce number of the delayed jobs - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - return - } - - // otherwise, reduce number of the active jobs - atomic.AddInt64(i.Options.active, ^int64(0)) - // noop for the in-memory -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/memory/memorykv/config.go b/plugins/memory/memorykv/config.go deleted file mode 100644 index a8a8993f..00000000 --- a/plugins/memory/memorykv/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package memorykv - -// Config is default config for the in-memory driver -type Config struct { - // Interval for the check - Interval int -} - -// InitDefaults by default driver is turned off -func (c *Config) InitDefaults() { - if c.Interval == 0 { - c.Interval = 60 // seconds - } -} diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go deleted file mode 100644 index 5383275c..00000000 --- a/plugins/memory/memorykv/kv.go +++ /dev/null @@ -1,257 +0,0 @@ -package memorykv - -import ( - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - clearMu sync.RWMutex - heap sync.Map - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - log logger.Logger - cfg *Config -} - -func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_in_memory_driver") - - d := &Driver{ - stop: make(chan struct{}), - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if d.cfg == nil { - return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) - } - - d.cfg.InitDefaults() - - go d.gc() - - return d, nil -} - -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := d.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := d.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return data.(*kvv1.Item).Value, nil - } - return nil, nil -} - -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for i := range keys { - if value, ok := d.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*kvv1.Item).Value - } - } - - return m, nil -} - -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - continue - } - // TTL is set - if items[i].Timeout != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - } - - d.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(*kvv1.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - d.heap.Store(items[i].Key, &kvv1.Item{ - Key: items[i].Key, - Value: tmp.Value, - Timeout: items[i].Timeout, - }) - } - } - - return nil -} - -func (d *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for i := range keys { - if item, ok := d.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*kvv1.Item).Timeout - } - } - return m, nil -} - -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - d.heap.Delete(keys[i]) - } - return nil -} - -func (d *Driver) Clear() error { - d.clearMu.Lock() - d.heap = sync.Map{} - d.clearMu.Unlock() - - return nil -} - -func (d *Driver) Stop() { - d.stop <- struct{}{} -} - -// ================================== PRIVATE ====================================== - -func (d *Driver) gc() { - ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) - defer ticker.Stop() - for { - select { - case <-d.stop: - return - case now := <-ticker.C: - // mutes needed to clear the map - d.clearMu.RLock() - - // check every second - d.heap.Range(func(key, value interface{}) bool { - v := value.(*kvv1.Item) - if v.Timeout == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.Timeout) - if err != nil { - return false - } - - if now.After(t) { - d.log.Debug("key deleted", "key", key) - d.heap.Delete(key) - } - return true - }) - - d.clearMu.RUnlock() - } - } -} diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go deleted file mode 100644 index 231da134..00000000 --- a/plugins/memory/memorypubsub/pubsub.go +++ /dev/null @@ -1,92 +0,0 @@ -package memorypubsub - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - // channel with the messages from the RPC - pushCh chan *pubsub.Message - // user-subscribed topics - storage bst.Storage - log logger.Logger -} - -func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { - ps := &PubSubDriver{ - pushCh: make(chan *pubsub.Message, 100), - storage: bst.NewBST(), - log: log, - } - return ps, nil -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.pushCh <- msg - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.pushCh <- msg - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("pubsub_memory") - select { - case msg := <-p.pushCh: - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil - } - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } - - return nil, nil -} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go deleted file mode 100644 index 87e0f84b..00000000 --- a/plugins/memory/plugin.go +++ /dev/null @@ -1,68 +0,0 @@ -package memory - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" - "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" - "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" -) - -const PluginName string = "memory" - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (p *Plugin) Stop() error { - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// Drivers implementation - -func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return memorypubsub.NewPubSubDriver(p.log, key) -} - -func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("memory_plugin_construct") - st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) - if err != nil { - return nil, errors.E(op, err) - } - return st, nil -} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return memoryjobs.FromPipeline(pipeline, p.log, e, pq) -} |