summaryrefslogtreecommitdiff
path: root/plugins/memory/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 20:09:01 +0300
committerGitHub <[email protected]>2021-09-02 20:09:01 +0300
commit6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch)
treef6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins/memory/plugin.go
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
parent4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff)
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r--plugins/memory/plugin.go50
1 files changed, 28 insertions, 22 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 7d418a70..515e469a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,27 +2,29 @@ package memory
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorykv"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub"
)
const PluginName string = "memory"
type Plugin struct {
- // heap is user map for the key-value pairs
- stop chan struct{}
-
- log logger.Logger
- cfgPlugin config.Configurer
- drivers uint
+ log logger.Logger
+ cfg config.Configurer
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.cfgPlugin = cfg
- p.stop = make(chan struct{}, 1)
+ p.cfg = cfg
return nil
}
@@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// Drivers implementation
+
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key)
+ return memorypubsub.NewPubSubDriver(p.log, key)
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
- st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
-
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
-func (p *Plugin) Name() string {
- return PluginName
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.FromPipeline(pipeline, p.log, e, pq)
}