summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 20:09:01 +0300
committerGitHub <[email protected]>2021-09-02 20:09:01 +0300
commit6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch)
treef6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins/memory
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
parent4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff)
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/memoryjobs/consumer.go281
-rw-r--r--plugins/memory/memoryjobs/item.go134
-rw-r--r--plugins/memory/memorykv/config.go (renamed from plugins/memory/config.go)2
-rw-r--r--plugins/memory/memorykv/kv.go (renamed from plugins/memory/kv.go)66
-rw-r--r--plugins/memory/memorypubsub/pubsub.go (renamed from plugins/memory/pubsub.go)2
-rw-r--r--plugins/memory/plugin.go50
6 files changed, 480 insertions, 55 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
new file mode 100644
index 00000000..fbdedefe
--- /dev/null
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -0,0 +1,281 @@
+package memoryjobs
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ prefetch string = "prefetch"
+ goroutinesMax uint64 = 1000
+)
+
+type Config struct {
+ Prefetch uint64 `mapstructure:"prefetch"`
+}
+
+type consumer struct {
+ cfg *Config
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+ pq priorityqueue.Queue
+ localPrefetch chan *Item
+
+ // time.sleep goroutines max number
+ goroutines uint64
+
+ delayed *int64
+ active *int64
+
+ listeners uint32
+ stopCh chan struct{}
+}
+
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("new_ephemeral_pipeline")
+
+ jb := &consumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
+ stopCh: make(chan struct{}),
+ }
+
+ err := cfg.UnmarshalKey(configKey, &jb.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if jb.cfg.Prefetch == 0 {
+ jb.cfg.Prefetch = 100_000
+ }
+
+ // initialize a local queue
+ jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ return &consumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)),
+ goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
+ stopCh: make(chan struct{}),
+ }, nil
+}
+
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ _, ok := c.pipeline.Load().(*pipeline.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ }
+
+ err := c.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: pipe.Name(),
+ Active: atomic.LoadInt64(c.active),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
+ return nil
+}
+
+func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ // stop the consumer
+ c.stopCh <- struct{}{}
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+}
+
+func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // listener already active
+ if l == 1 {
+ c.log.Warn("listener already in the active state")
+ return
+ }
+
+ // resume the consumer on the same channel
+ c.consume()
+
+ atomic.StoreUint32(&c.listeners, 1)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+}
+
+// Run is no-op for the ephemeral
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ select {
+ case c.stopCh <- struct{}{}:
+ default:
+ break
+ }
+
+ for i := 0; i < len(c.localPrefetch); i++ {
+ // drain all jobs from the channel
+ <-c.localPrefetch
+ }
+
+ c.localPrefetch = nil
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
+ return nil
+}
+
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&c.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&c.goroutines, 1)
+ atomic.AddInt64(c.delayed, 1)
+
+ time.Sleep(jj.Options.DelayDuration())
+
+ select {
+ case c.localPrefetch <- jj:
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
+ default:
+ c.log.Warn("can't push job", "error", "local queue closed or full")
+ }
+ }(msg)
+
+ return nil
+ }
+
+ // increase number of the active jobs
+ atomic.AddInt64(c.active, 1)
+
+ // insert to the local, limited pipeline
+ select {
+ case c.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (c *consumer) consume() {
+ go func() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-c.localPrefetch:
+ if !ok {
+ c.log.Warn("ephemeral local prefetch queue closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = c.handleItem
+ item.Options.active = c.active
+ item.Options.delayed = c.delayed
+
+ c.pq.Insert(item)
+ case <-c.stopCh:
+ return
+ }
+ }
+ }()
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go
new file mode 100644
index 00000000..f4d62ada
--- /dev/null
+++ b/plugins/memory/memoryjobs/item.go
@@ -0,0 +1,134 @@
+package memoryjobs
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ i.atomicallyReduceCount()
+ return nil
+}
+
+func (i *Item) Nack() error {
+ i.atomicallyReduceCount()
+ return nil
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ i.atomicallyReduceCount()
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go
index e51d09c5..a8a8993f 100644
--- a/plugins/memory/config.go
+++ b/plugins/memory/memorykv/config.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
// Config is default config for the in-memory driver
type Config struct {
diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go
index 68ea7266..9b3e176c 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
import (
"strings"
@@ -20,11 +20,11 @@ type Driver struct {
cfg *Config
}
-func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_in_memory_driver")
d := &Driver{
- stop: stop,
+ stop: make(chan struct{}),
log: log,
}
@@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure
return d, nil
}
-func (s *Driver) Has(keys ...string) (map[string]bool, error) {
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if _, ok := s.heap.Load(keys[i]); ok {
+ if _, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = true
}
}
@@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return m, nil
}
-func (s *Driver) Get(key string) ([]byte, error) {
+func (d *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("in_memory_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if data, exist := s.heap.Load(key); exist {
+ if data, exist := d.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
return data.(*kvv1.Item).Value, nil
@@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
m := make(map[string][]byte, len(keys))
for i := range keys {
- if value, ok := s.heap.Load(keys[i]); ok {
+ if value, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = value.(*kvv1.Item).Value
}
}
@@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
return m, nil
}
-func (s *Driver) Set(items ...*kvv1.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error {
}
}
- s.heap.Store(items[i].Key, items[i])
+ d.heap.Store(items[i].Key, items[i])
}
return nil
}
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*kvv1.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
+ if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
_, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
@@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &kvv1.Item{
+ d.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]string, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) {
m := make(map[string]string, len(keys))
for i := range keys {
- if item, ok := s.heap.Load(keys[i]); ok {
+ if item, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
}
-func (s *Driver) Delete(keys ...string) error {
+func (d *Driver) Delete(keys ...string) error {
const op = errors.Op("in_memory_plugin_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error {
}
for i := range keys {
- s.heap.Delete(keys[i])
+ d.heap.Delete(keys[i])
}
return nil
}
-func (s *Driver) Clear() error {
- s.clearMu.Lock()
- s.heap = sync.Map{}
- s.clearMu.Unlock()
+func (d *Driver) Clear() error {
+ d.clearMu.Lock()
+ d.heap = sync.Map{}
+ d.clearMu.Unlock()
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ================================== PRIVATE ======================================
-func (s *Driver) gc() {
- ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+func (d *Driver) gc() {
+ ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second)
+ defer ticker.Stop()
for {
select {
- case <-s.stop:
- ticker.Stop()
+ case <-d.stop:
return
case now := <-ticker.C:
// mutes needed to clear the map
- s.clearMu.RLock()
+ d.clearMu.RLock()
// check every second
- s.heap.Range(func(key, value interface{}) bool {
+ d.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
@@ -237,13 +241,13 @@ func (s *Driver) gc() {
}
if now.After(t) {
- s.log.Debug("key deleted", "key", key)
- s.heap.Delete(key)
+ d.log.Debug("key deleted", "key", key)
+ d.heap.Delete(key)
}
return true
})
- s.clearMu.RUnlock()
+ d.clearMu.RUnlock()
}
}
}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
index fd30eb54..75122571 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -1,4 +1,4 @@
-package memory
+package memorypubsub
import (
"context"
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 7d418a70..515e469a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,27 +2,29 @@ package memory
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorykv"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub"
)
const PluginName string = "memory"
type Plugin struct {
- // heap is user map for the key-value pairs
- stop chan struct{}
-
- log logger.Logger
- cfgPlugin config.Configurer
- drivers uint
+ log logger.Logger
+ cfg config.Configurer
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.cfgPlugin = cfg
- p.stop = make(chan struct{}, 1)
+ p.cfg = cfg
return nil
}
@@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// Drivers implementation
+
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key)
+ return memorypubsub.NewPubSubDriver(p.log, key)
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
- st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
-
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
-func (p *Plugin) Name() string {
- return PluginName
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.FromPipeline(pipeline, p.log, e, pq)
}