diff options
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r-- | plugins/memory/plugin.go | 50 |
1 files changed, 28 insertions, 22 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } |