summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 20:09:01 +0300
committerGitHub <[email protected]>2021-09-02 20:09:01 +0300
commit6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch)
treef6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
parent4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff)
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go22
-rw-r--r--plugins/amqp/amqpjobs/item.go7
-rw-r--r--plugins/amqp/amqpjobs/redial.go21
-rw-r--r--plugins/beanstalk/consumer.go28
-rw-r--r--plugins/beanstalk/item.go9
-rw-r--r--plugins/boltdb/boltjobs/consumer.go18
-rw-r--r--plugins/boltdb/boltjobs/listener.go5
-rw-r--r--plugins/boltdb/boltkv/driver.go8
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md1
-rw-r--r--plugins/boltdb/plugin.go24
-rw-r--r--plugins/ephemeral/plugin.go41
-rw-r--r--plugins/jobs/job/job.go11
-rw-r--r--plugins/jobs/job/job_test.go27
-rw-r--r--plugins/jobs/plugin.go96
-rw-r--r--plugins/jobs/rpc.go8
-rw-r--r--plugins/kv/plugin.go4
-rw-r--r--plugins/memcached/memcachedkv/config.go (renamed from plugins/memcached/config.go)2
-rw-r--r--plugins/memcached/memcachedkv/driver.go (renamed from plugins/memcached/driver.go)4
-rw-r--r--plugins/memcached/plugin.go3
-rw-r--r--plugins/memory/memoryjobs/consumer.go (renamed from plugins/ephemeral/consumer.go)70
-rw-r--r--plugins/memory/memoryjobs/item.go (renamed from plugins/ephemeral/item.go)3
-rw-r--r--plugins/memory/memorykv/config.go (renamed from plugins/memory/config.go)2
-rw-r--r--plugins/memory/memorykv/kv.go (renamed from plugins/memory/kv.go)66
-rw-r--r--plugins/memory/memorypubsub/pubsub.go (renamed from plugins/memory/pubsub.go)2
-rw-r--r--plugins/memory/plugin.go50
-rw-r--r--plugins/redis/clients.go84
-rw-r--r--plugins/redis/interface.go12
-rw-r--r--plugins/redis/kv/kv.go2
-rw-r--r--plugins/sqs/consumer.go16
29 files changed, 269 insertions, 377 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..2ff0a40a 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("rabbit_run")
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ start := time.Now()
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
+
return nil
}
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 04385afe..b837ff86 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -43,17 +43,18 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
ack func(multiply bool) error
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
// When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
// When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
+ requeueFn func(context.Context, *Item) error
+ // delayed jobs TODO(rustatian): figure out how to get stats from the DLX
delayed *int64
multipleAsk bool
requeue bool
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 8d21784f..698a34a6 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit
c.Unlock()
case <-c.stopCh:
- if c.publishChan != nil {
- pch := <-c.publishChan
- err := pch.Close()
- if err != nil {
- c.log.Error("publish channel close", "error", err)
- }
+ pch := <-c.publishChan
+ err := pch.Close()
+ if err != nil {
+ c.log.Error("publish channel close", "error", err)
}
if c.consumeChan != nil {
- err := c.consumeChan.Close()
+ err = c.consumeChan.Close()
if err != nil {
c.log.Error("consume channel close", "error", err)
}
}
- if c.conn != nil {
- err := c.conn.Close()
- if err != nil {
- c.log.Error("amqp connection close", "error", err)
- }
+
+ err = c.conn.Close()
+ if err != nil {
+ c.log.Error("amqp connection close", "error", err)
}
return
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..30807f03 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "encoding/gob"
"strconv"
"strings"
"sync/atomic"
@@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
bb := new(bytes.Buffer)
bb.Grow(64)
- err := item.pack(bb)
+ err := gob.NewEncoder(bb).Encode(item)
if err != nil {
return errors.E(op, err)
}
+ body := make([]byte, bb.Len())
+ copy(body, bb.Bytes())
+ bb.Reset()
+ bb = nil
+
// https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
// <pri> is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
@@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
+ start := time.Now()
// load atomic value
+ // check if the pipeline registered
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
@@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Stop(context.Context) error {
+ start := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (j *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go
index 0a6cd560..03060994 100644
--- a/plugins/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
@@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item {
}
}
-func (i *Item) pack(b *bytes.Buffer) error {
- err := gob.NewEncoder(b).Encode(i)
- if err != nil {
- return err
- }
-
- return nil
-}
-
func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index ed0eda61..62045d3b 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return &consumer{
file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
- prefetch: pipeline.Int(prefetch, 100),
+ prefetch: pipeline.Int(prefetch, 1000),
permissions: conf.Permissions,
bPool: sync.Pool{New: func() interface{} {
@@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("boltdb_run")
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.stopCh <- struct{}{}
c.stopCh <- struct{}{}
@@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 7c161555..081d3f57 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,6 +3,7 @@ package boltjobs
import (
"bytes"
"encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -18,6 +19,10 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
+ if atomic.LoadUint64(c.active) > uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go
index ba1450cd..656d572e 100644
--- a/plugins/boltdb/boltkv/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -38,7 +38,7 @@ type Driver struct {
stop chan struct{}
}
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
if !cfgPlugin.Has(RootPluginName) {
@@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
d := &Driver{
log: log,
- stop: stop,
+ stop: make(chan struct{}),
}
err := cfgPlugin.UnmarshalKey(key, &d.cfg)
@@ -411,6 +411,10 @@ func (d *Driver) Clear() error {
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ========================= PRIVATE =================================
func (d *Driver) startGCLoop() { //nolint:gocognit
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
index 317aec90..1424e586 100644
--- a/plugins/boltdb/doc/job_lifecycle.md
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -7,4 +7,3 @@ There are several boltdb buckets:
get into the `InQueueBucket` waiting to acknowledgement.
3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
-``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
index 683b26f1..ad98cf3c 100644
--- a/plugins/boltdb/plugin.go
+++ b/plugins/boltdb/plugin.go
@@ -19,19 +19,14 @@ const (
// Plugin BoltDB K/V storage.
type Plugin struct {
- cfgPlugin config.Configurer
+ cfg config.Configurer
// logger
log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.stop = make(chan struct{})
p.log = log
- p.cfgPlugin = cfg
+ p.cfg = cfg
return nil
}
@@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
@@ -60,23 +49,20 @@ func (p *Plugin) Available() {}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
// JOBS bbolt implementation
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue)
}
diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
deleted file mode 100644
index 28495abb..00000000
--- a/plugins/ephemeral/plugin.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "ephemeral"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Available() {}
-
-// JobsConstruct creates new ephemeral consumer from the configuration
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(configKey, p.log, p.cfg, e, pq)
-}
-
-// FromPipeline creates new ephemeral consumer from the provided pipeline
-func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipeline, p.log, e, pq)
-}
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
index 06c3254e..adab2a0a 100644
--- a/plugins/jobs/job/job.go
+++ b/plugins/jobs/job/job.go
@@ -45,17 +45,6 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
}
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
index a47151a3..4a95e27d 100644
--- a/plugins/jobs/job/job_test.go
+++ b/plugins/jobs/job/job_test.go
@@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) {
opts := &Options{Delay: 1}
assert.Equal(t, time.Second, opts.DelayDuration())
}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(2), opts.Delay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Delay)
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 3f3fa196..3aec6acc 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -41,7 +41,7 @@ type Plugin struct {
server server.Server
jobConstructors map[string]jobs.Constructor
- consumers map[string]jobs.Consumer
+ consumers sync.Map // map[string]jobs.Consumer
// events handler
events events.Handler
@@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events.AddListener(p.collectJobsEvents)
p.jobConstructors = make(map[string]jobs.Constructor)
- p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
@@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// config key for the particular sub-driver jobs.pipelines.test-local
configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
// init the driver
- initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue)
if err != nil {
errCh <- errors.E(op, err)
return false
}
// add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[name] = initializedDriver
+ p.consumers.Store(name, initializedDriver)
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipe)
@@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
- for k, v := range p.consumers {
+ // range over all consumers and call stop
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := v.Stop(ctx)
+ err := consumer.Stop(ctx)
if err != nil {
cancel()
- p.log.Error("stop job driver", "driver", k)
- continue
+ p.log.Error("stop job driver", "driver", key)
+ return true
}
cancel()
- }
+ return true
+ })
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
@@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State {
func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
const op = errors.Op("jobs_plugin_drivers_state")
- jst := make([]*jobState.State, 0, len(p.consumers))
- for k := range p.consumers {
- d := p.consumers[k]
+ jst := make([]*jobState.State, 0, 2)
+ var err error
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
- state, err := d.State(newCtx)
+
+ var state *jobState.State
+ state, err = consumer.State(newCtx)
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return false
}
jst = append(jst, state)
cancel()
+ return true
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
}
return jst, nil
}
@@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
// if job has no priority, inherit it from the pipeline
- // TODO(rustatian) merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}
@@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
- err := d.Push(ctx, j)
+ err := d.(jobs.Consumer).Push(ctx, j)
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventPushError,
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return errors.E(op, err)
}
@@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return nil
@@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error {
func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
for i := 0; i < len(j); i++ {
- start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
@@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.Push(ctx, j[i])
+ err := d.(jobs.Consumer).Push(ctx, j[i])
if err != nil {
cancel()
p.events.Push(events.JobEvent{
@@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Pause(ctx, ppl.Name())
+ d.(jobs.Consumer).Pause(ctx, ppl.Name())
}
func (p *Plugin) Resume(pp string) {
@@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Resume(ctx, ppl.Name())
+ d.(jobs.Consumer).Resume(ctx, ppl.Name())
}
// Declare a pipeline.
@@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// init the driver from pipeline
- initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue)
if err != nil {
return errors.E(op, err)
}
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[pipeline.Name()] = initializedDriver
-
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipeline)
if err != nil {
@@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
return errors.E(op, err)
}
}
- }
- // save the pipeline
- p.pipelines.Store(pipeline.Name(), pipeline)
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers.Store(pipeline.Name(), initializedDriver)
+ // save the pipeline
+ p.pipelines.Store(pipeline.Name(), pipeline)
+ }
return nil
}
@@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ // delete consumer
+ d, ok := p.consumers.LoadAndDelete(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
- // delete consumer
- delete(p.consumers, ppl.Name())
- p.pipelines.Delete(pp)
+ // delete old pipeline
+ p.pipelines.LoadAndDelete(pp)
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
+ err := d.(jobs.Consumer).Stop(ctx)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
- return d.Stop(ctx)
+ cancel()
+ return nil
}
func (p *Plugin) List() []string {
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 94f903d5..d7b93bd1 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
return errors.E(op, errors.Str("empty ID field not allowed"))
}
- err := r.p.Push(r.from(j.GetJob()))
+ err := r.p.Push(from(j.GetJob()))
if err != nil {
return errors.E(op, err)
}
@@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
for i := 0; i < l; i++ {
// convert transport entity into domain
// how we can do this quickly
- batch[i] = r.from(j.GetJobs()[i])
+ batch[i] = from(j.GetJobs()[i])
}
err := r.p.PushBatch(batch)
@@ -137,8 +137,8 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
- headers := map[string][]string{}
+func from(j *jobsv1beta.Job) *job.Job {
+ headers := make(map[string][]string, len(j.GetHeaders()))
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index c6ca96c3..a1144b85 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
+ // stop all attached storages
+ for k := range p.storages {
+ p.storages[k].Stop()
+ }
return nil
}
diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go
index 6d413790..569e2573 100644
--- a/plugins/memcached/config.go
+++ b/plugins/memcached/memcachedkv/config.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
type Config struct {
// Addr is url for memcached, 11211 port is used by default
diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go
index e24747fe..6d5e1802 100644
--- a/plugins/memcached/driver.go
+++ b/plugins/memcached/memcachedkv/driver.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
import (
"strings"
@@ -246,3 +246,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..47bca0e2 100644
--- a/plugins/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv"
)
const (
@@ -39,7 +40,7 @@ func (s *Plugin) Available() {}
func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go
index 8870bb0f..fbdedefe 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
@@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
goroutines: 0,
active: utils.Int64(0),
delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- jb := &consumer{
- log: log,
- pq: pq,
- eh: eh,
- goroutines: 0,
- active: utils.Int64(0),
- delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
- }
-
- // initialize a local queue
- jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
-
- return jb, nil
+ return &consumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)),
+ goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
+ stopCh: make(chan struct{}),
+ }, nil
}
func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
@@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) {
c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
@@ -186,17 +185,28 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
+ select {
+ case c.stopCh <- struct{}{}:
+ default:
+ break
+ }
+
+ for i := 0; i < len(c.localPrefetch); i++ {
+ // drain all jobs from the channel
+ <-c.localPrefetch
}
+ c.localPrefetch = nil
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -219,10 +229,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
time.Sleep(jj.Options.DelayDuration())
- // send the item after timeout expired
- c.localPrefetch <- jj
-
- atomic.AddUint64(&c.goroutines, ^uint64(0))
+ select {
+ case c.localPrefetch <- jj:
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
+ default:
+ c.log.Warn("can't push job", "error", "local queue closed or full")
+ }
}(msg)
return nil
@@ -247,7 +259,7 @@ func (c *consumer) consume() {
select {
case item, ok := <-c.localPrefetch:
if !ok {
- c.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue closed")
return
}
diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go
index 3298424d..f4d62ada 100644
--- a/plugins/ephemeral/item.go
+++ b/plugins/memory/memoryjobs/item.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
@@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
+ Headers: job.Headers,
Options: &Options{
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,
diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go
index e51d09c5..a8a8993f 100644
--- a/plugins/memory/config.go
+++ b/plugins/memory/memorykv/config.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
// Config is default config for the in-memory driver
type Config struct {
diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go
index 68ea7266..9b3e176c 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
import (
"strings"
@@ -20,11 +20,11 @@ type Driver struct {
cfg *Config
}
-func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_in_memory_driver")
d := &Driver{
- stop: stop,
+ stop: make(chan struct{}),
log: log,
}
@@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure
return d, nil
}
-func (s *Driver) Has(keys ...string) (map[string]bool, error) {
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if _, ok := s.heap.Load(keys[i]); ok {
+ if _, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = true
}
}
@@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return m, nil
}
-func (s *Driver) Get(key string) ([]byte, error) {
+func (d *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("in_memory_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if data, exist := s.heap.Load(key); exist {
+ if data, exist := d.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
return data.(*kvv1.Item).Value, nil
@@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
m := make(map[string][]byte, len(keys))
for i := range keys {
- if value, ok := s.heap.Load(keys[i]); ok {
+ if value, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = value.(*kvv1.Item).Value
}
}
@@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
return m, nil
}
-func (s *Driver) Set(items ...*kvv1.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error {
}
}
- s.heap.Store(items[i].Key, items[i])
+ d.heap.Store(items[i].Key, items[i])
}
return nil
}
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*kvv1.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
+ if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
_, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
@@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &kvv1.Item{
+ d.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]string, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) {
m := make(map[string]string, len(keys))
for i := range keys {
- if item, ok := s.heap.Load(keys[i]); ok {
+ if item, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
}
-func (s *Driver) Delete(keys ...string) error {
+func (d *Driver) Delete(keys ...string) error {
const op = errors.Op("in_memory_plugin_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error {
}
for i := range keys {
- s.heap.Delete(keys[i])
+ d.heap.Delete(keys[i])
}
return nil
}
-func (s *Driver) Clear() error {
- s.clearMu.Lock()
- s.heap = sync.Map{}
- s.clearMu.Unlock()
+func (d *Driver) Clear() error {
+ d.clearMu.Lock()
+ d.heap = sync.Map{}
+ d.clearMu.Unlock()
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ================================== PRIVATE ======================================
-func (s *Driver) gc() {
- ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+func (d *Driver) gc() {
+ ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second)
+ defer ticker.Stop()
for {
select {
- case <-s.stop:
- ticker.Stop()
+ case <-d.stop:
return
case now := <-ticker.C:
// mutes needed to clear the map
- s.clearMu.RLock()
+ d.clearMu.RLock()
// check every second
- s.heap.Range(func(key, value interface{}) bool {
+ d.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
@@ -237,13 +241,13 @@ func (s *Driver) gc() {
}
if now.After(t) {
- s.log.Debug("key deleted", "key", key)
- s.heap.Delete(key)
+ d.log.Debug("key deleted", "key", key)
+ d.heap.Delete(key)
}
return true
})
- s.clearMu.RUnlock()
+ d.clearMu.RUnlock()
}
}
}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
index fd30eb54..75122571 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -1,4 +1,4 @@
-package memory
+package memorypubsub
import (
"context"
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 7d418a70..515e469a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,27 +2,29 @@ package memory
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorykv"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub"
)
const PluginName string = "memory"
type Plugin struct {
- // heap is user map for the key-value pairs
- stop chan struct{}
-
- log logger.Logger
- cfgPlugin config.Configurer
- drivers uint
+ log logger.Logger
+ cfg config.Configurer
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.cfgPlugin = cfg
- p.stop = make(chan struct{}, 1)
+ p.cfg = cfg
return nil
}
@@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// Drivers implementation
+
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key)
+ return memorypubsub.NewPubSubDriver(p.log, key)
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
- st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
-
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
-func (p *Plugin) Name() string {
- return PluginName
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.FromPipeline(pipeline, p.log, e, pq)
}
diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go
deleted file mode 100644
index d0a184d2..00000000
--- a/plugins/redis/clients.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package redis
-
-import (
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
-)
-
-// RedisClient return a client based on the provided section key
-// key sample: kv.some-section.redis
-// kv.redis
-// redis (root)
-func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) {
- const op = errors.Op("redis_get_client")
-
- if !p.cfgPlugin.Has(key) {
- return nil, errors.E(op, errors.Errorf("no such section: %s", key))
- }
-
- cfg := &Config{}
-
- err := p.cfgPlugin.UnmarshalKey(key, cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc, nil
-}
-
-func (p *Plugin) DefaultClient() redis.UniversalClient {
- cfg := &Config{}
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc
-}
diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go
deleted file mode 100644
index 189b0002..00000000
--- a/plugins/redis/interface.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package redis
-
-import "github.com/go-redis/redis/v8"
-
-// Redis in the redis KV plugin interface
-type Redis interface {
- // RedisClient provides universal redis client
- RedisClient(key string) (redis.UniversalClient, error)
-
- // DefaultClient provide default redis client based on redis defaults
- DefaultClient() redis.UniversalClient
-}
diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go
index b41cb86c..3d062fbb 100644
--- a/plugins/redis/kv/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -248,3 +248,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index dfbda154..92dbd6a8 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("sqs_run")
c.Lock()
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.pauseCh <- struct{}{}
}
@@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}