From 9c8da162b3347b632f33f85d56e8c1ff7014631a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 13:19:09 +0300 Subject: Code polishing before release Signed-off-by: Valery Piashchynski --- Makefile | 14 +- common/kv/interface.go | 3 + plugins/boltdb/boltjobs/consumer.go | 2 +- plugins/boltdb/boltjobs/listener.go | 5 + plugins/boltdb/boltkv/driver.go | 8 +- plugins/boltdb/doc/job_lifecycle.md | 1 - plugins/boltdb/plugin.go | 24 +- plugins/ephemeral/consumer.go | 269 ---------- plugins/ephemeral/item.go | 133 ----- plugins/ephemeral/plugin.go | 41 -- plugins/kv/plugin.go | 4 + plugins/memcached/config.go | 12 - plugins/memcached/driver.go | 248 --------- plugins/memcached/memcachedkv/config.go | 12 + plugins/memcached/memcachedkv/driver.go | 250 +++++++++ plugins/memcached/plugin.go | 3 +- plugins/memory/config.go | 14 - plugins/memory/kv.go | 249 --------- plugins/memory/memoryjobs/consumer.go | 269 ++++++++++ plugins/memory/memoryjobs/item.go | 133 +++++ plugins/memory/memorykv/config.go | 14 + plugins/memory/memorykv/kv.go | 253 +++++++++ plugins/memory/memorypubsub/pubsub.go | 92 ++++ plugins/memory/plugin.go | 50 +- plugins/memory/pubsub.go | 92 ---- plugins/redis/clients.go | 84 --- plugins/redis/interface.go | 12 - plugins/redis/kv/kv.go | 2 + tests/plugins/jobs/configs/.rr-jobs-init.yaml | 6 +- .../jobs/ephemeral/.rr-ephemeral-declare.yaml | 21 - .../plugins/jobs/ephemeral/.rr-ephemeral-init.yaml | 37 -- .../jobs/ephemeral/.rr-ephemeral-jobs-err.yaml | 21 - .../jobs/ephemeral/.rr-ephemeral-pause-resume.yaml | 44 -- tests/plugins/jobs/jobs_ephemeral_test.go | 571 --------------------- tests/plugins/jobs/jobs_general_test.go | 10 +- tests/plugins/jobs/jobs_memory_test.go | 571 +++++++++++++++++++++ tests/plugins/jobs/memory/.rr-memory-declare.yaml | 21 + tests/plugins/jobs/memory/.rr-memory-init.yaml | 37 ++ tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml | 21 + .../jobs/memory/.rr-memory-pause-resume.yaml | 44 ++ .../kv/configs/.rr-kv-bolt-no-interval.yaml | 3 + tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml | 3 + tests/plugins/kv/configs/.rr-kv-init.yaml | 6 +- tests/plugins/redis/plugin1.go | 45 -- tests/plugins/redis/redis_plugin_test.go | 120 ----- tests/plugins/rpc/configs/.rr-rpc-disabled.yaml | 7 +- tests/plugins/rpc/configs/.rr.yaml | 8 +- 47 files changed, 1809 insertions(+), 2080 deletions(-) delete mode 100644 plugins/ephemeral/consumer.go delete mode 100644 plugins/ephemeral/item.go delete mode 100644 plugins/ephemeral/plugin.go delete mode 100644 plugins/memcached/config.go delete mode 100644 plugins/memcached/driver.go create mode 100644 plugins/memcached/memcachedkv/config.go create mode 100644 plugins/memcached/memcachedkv/driver.go delete mode 100644 plugins/memory/config.go delete mode 100644 plugins/memory/kv.go create mode 100644 plugins/memory/memoryjobs/consumer.go create mode 100644 plugins/memory/memoryjobs/item.go create mode 100644 plugins/memory/memorykv/config.go create mode 100644 plugins/memory/memorykv/kv.go create mode 100644 plugins/memory/memorypubsub/pubsub.go delete mode 100644 plugins/memory/pubsub.go delete mode 100644 plugins/redis/clients.go delete mode 100644 plugins/redis/interface.go delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml delete mode 100644 tests/plugins/jobs/jobs_ephemeral_test.go create mode 100644 tests/plugins/jobs/jobs_memory_test.go create mode 100644 tests/plugins/jobs/memory/.rr-memory-declare.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-init.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml delete mode 100644 tests/plugins/redis/plugin1.go delete mode 100644 tests/plugins/redis/redis_plugin_test.go diff --git a/Makefile b/Makefile index 389c9014..1de45451 100755 --- a/Makefile +++ b/Makefile @@ -35,7 +35,6 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -50,10 +49,15 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./pkg/priority_queue - go test -v -race -tags=debug ./plugins/jobs/job go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server + go test -v -race -tags=debug ./plugins/jobs/job + go test -v -race -tags=debug ./tests/plugins/jobs + go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/broadcast + go test -v -race -tags=debug ./tests/plugins/websockets + go test -v -race -tags=debug ./plugins/websockets go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload @@ -65,12 +69,6 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/headers go test -v -race -tags=debug ./tests/plugins/logger go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/kv - go test -v -race -tags=debug ./tests/plugins/broadcast - go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./plugins/websockets - go test -v -race -tags=debug ./tests/plugins/jobs docker-compose -f tests/env/docker-compose.yaml down diff --git a/common/kv/interface.go b/common/kv/interface.go index 5736a6a7..bc6a07b2 100644 --- a/common/kv/interface.go +++ b/common/kv/interface.go @@ -30,6 +30,9 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error + + // Stop the storage driver + Stop() } // Constructor provides storage based on the config diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..46d596fa 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go deleted file mode 100644 index 8870bb0f..00000000 --- a/plugins/ephemeral/consumer.go +++ /dev/null @@ -1,269 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - prefetch string = "prefetch" - goroutinesMax uint64 = 1000 -) - -type Config struct { - Prefetch uint64 `mapstructure:"prefetch"` -} - -type consumer struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline atomic.Value - pq priorityqueue.Queue - localPrefetch chan *Item - - // time.sleep goroutines max number - goroutines uint64 - - delayed *int64 - active *int64 - - listeners uint32 - stopCh chan struct{} -} - -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_ephemeral_pipeline") - - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - err := cfg.UnmarshalKey(configKey, &jb.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil -} - -func (c *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("ephemeral_push") - - // check if the pipeline registered - _, ok := c.pipeline.Load().(*pipeline.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) - } - - err := c.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: pipe.Name(), - Active: atomic.LoadInt64(c.active), - Delayed: atomic.LoadInt64(c.delayed), - Ready: ready(atomic.LoadUint32(&c.listeners)), - }, nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - // stop the consumer - c.stopCh <- struct{}{} - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return - } - - // resume the consumer on the same channel - c.consume() - - atomic.StoreUint32(&c.listeners, 1) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -func (c *consumer) Stop(_ context.Context) error { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) - - return nil -} - -func (c *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&c.goroutines, 1) - atomic.AddInt64(c.delayed, 1) - - time.Sleep(jj.Options.DelayDuration()) - - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) - }(msg) - - return nil - } - - // increase number of the active jobs - atomic.AddInt64(c.active, 1) - - // insert to the local, limited pipeline - select { - case c.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) - } -} - -func (c *consumer) consume() { - go func() { - // redirect - for { - select { - case item, ok := <-c.localPrefetch: - if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") - return - } - - // set requeue channel - item.Options.requeueFn = c.handleItem - item.Options.active = c.active - item.Options.delayed = c.delayed - - c.pq.Insert(item) - case <-c.stopCh: - return - } - } - }() -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/ephemeral/item.go b/plugins/ephemeral/item.go deleted file mode 100644 index 3298424d..00000000 --- a/plugins/ephemeral/item.go +++ /dev/null @@ -1,133 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - requeueFn func(context.Context, *Item) error - active *int64 - delayed *int64 -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Nack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - i.atomicallyReduceCount() - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - return nil -} - -// atomicallyReduceCount reduces counter of active or delayed jobs -func (i *Item) atomicallyReduceCount() { - // if job was delayed, reduce number of the delayed jobs - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - return - } - - // otherwise, reduce number of the active jobs - atomic.AddInt64(i.Options.active, ^int64(0)) - // noop for the in-memory -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index c6ca96c3..a1144b85 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // stop all attached storages + for k := range p.storages { + p.storages[k].Stop() + } return nil } diff --git a/plugins/memcached/config.go b/plugins/memcached/config.go deleted file mode 100644 index 6d413790..00000000 --- a/plugins/memcached/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package memcached - -type Config struct { - // Addr is url for memcached, 11211 port is used by default - Addr []string -} - -func (s *Config) InitDefaults() { - if s.Addr == nil { - s.Addr = []string{"127.0.0.1:11211"} // default url for memcached - } -} diff --git a/plugins/memcached/driver.go b/plugins/memcached/driver.go deleted file mode 100644 index e24747fe..00000000 --- a/plugins/memcached/driver.go +++ /dev/null @@ -1,248 +0,0 @@ -package memcached - -import ( - "strings" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - client *memcache.Client - log logger.Logger - cfg *Config -} - -// NewMemcachedDriver returns a memcache client using the provided server(s) -// with equal weight. If a server is listed multiple times, -// it gets a proportional amount of weight. -func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_memcached_driver") - - s := &Driver{ - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &s.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - s.cfg.InitDefaults() - - m := memcache.New(s.cfg.Addr...) - s.client = m - - return s, nil -} - -// Has checks the key for existence -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool, len(keys)) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - exist, err := d.client.Get(keys[i]) - - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if exist != nil { - m[keys[i]] = true - } - } - return m, nil -} - -// Get gets the item for the given key. ErrCacheMiss is returned for a -// memcache cache miss. The key must be at most 250 bytes in length. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("memcached_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - data, err := d.client.Get(key) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - return nil, nil - } - return nil, errors.E(op, err) - } - if data != nil { - // return the value by the key - return data.Value, nil - } - // data is nil by some reason and error also nil - return nil, nil -} - -// MGet return map with key -- string -// and map value as value -- []byte -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("memcached_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - for i := range keys { - // Here also MultiGet - data, err := d.client.Get(keys[i]) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if data != nil { - m[keys[i]] = data.Value - } - } - - return m, nil -} - -// Set sets the KV pairs. Keys should be 250 bytes maximum -// TTL: -// Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - return errors.E(op, errors.EmptyItem) - } - - // pre-allocate item - memcachedItem := &memcache.Item{ - Key: items[i].Key, - // unsafe convert - Value: items[i].Value, - Flags: 0, - } - - // add additional TTL in case of TTL isn't empty - if items[i].Timeout != "" { - // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - memcachedItem.Expiration = int32(t.Unix()) - } - - err := d.client.Set(memcachedItem) - if err != nil { - return err - } - } - - return nil -} - -// MExpire Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - - // Touch updates the expiry for the given key. The seconds parameter is either - // a Unix timestamp or, if seconds is less than 1 month, the number of seconds - // into the future at which time the item will expire. Zero means the item has - // no expiration time. ErrCacheMiss is returned if the key is not in the cache. - // The key must be at most 250 bytes in length. - err = d.client.Touch(items[i].Key, int32(t.Unix())) - if err != nil { - return errors.E(op, err) - } - } - return nil -} - -// TTL return time in seconds (int32) for a given keys -func (d *Driver) TTL(_ ...string) (map[string]string, error) { - const op = errors.Op("memcached_plugin_ttl") - return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) -} - -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - err := d.client.Delete(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return errors.E(op, err) - } - } - return nil -} - -func (d *Driver) Clear() error { - err := d.client.DeleteAll() - if err != nil { - d.log.Error("flush_all operation failed", "error", err) - return err - } - - return nil -} diff --git a/plugins/memcached/memcachedkv/config.go b/plugins/memcached/memcachedkv/config.go new file mode 100644 index 00000000..569e2573 --- /dev/null +++ b/plugins/memcached/memcachedkv/config.go @@ -0,0 +1,12 @@ +package memcachedkv + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + if s.Addr == nil { + s.Addr = []string{"127.0.0.1:11211"} // default url for memcached + } +} diff --git a/plugins/memcached/memcachedkv/driver.go b/plugins/memcached/memcachedkv/driver.go new file mode 100644 index 00000000..6d5e1802 --- /dev/null +++ b/plugins/memcached/memcachedkv/driver.go @@ -0,0 +1,250 @@ +package memcachedkv + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + client *memcache.Client + log logger.Logger + cfg *Config +} + +// NewMemcachedDriver returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_memcached_driver") + + s := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + s.cfg.InitDefaults() + + m := memcache.New(s.cfg.Addr...) + s.client = m + + return s, nil +} + +// Has checks the key for existence +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := d.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := d.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// MGet return map with key -- string +// and map value as value -- []byte +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := d.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: items[i].Value, + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].Timeout != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := d.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// MExpire Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = d.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]string, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := d.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} + +func (d *Driver) Clear() error { + err := d.client.DeleteAll() + if err != nil { + d.log.Error("flush_all operation failed", "error", err) + return err + } + + return nil +} + +func (d *Driver) Stop() {} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..47bca0e2 100644 --- a/plugins/memcached/plugin.go +++ b/plugins/memcached/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv" ) const ( @@ -39,7 +40,7 @@ func (s *Plugin) Available() {} func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/memory/config.go b/plugins/memory/config.go deleted file mode 100644 index e51d09c5..00000000 --- a/plugins/memory/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package memory - -// Config is default config for the in-memory driver -type Config struct { - // Interval for the check - Interval int -} - -// InitDefaults by default driver is turned off -func (c *Config) InitDefaults() { - if c.Interval == 0 { - c.Interval = 60 // seconds - } -} diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go deleted file mode 100644 index 68ea7266..00000000 --- a/plugins/memory/kv.go +++ /dev/null @@ -1,249 +0,0 @@ -package memory - -import ( - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - clearMu sync.RWMutex - heap sync.Map - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - log logger.Logger - cfg *Config -} - -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { - const op = errors.Op("new_in_memory_driver") - - d := &Driver{ - stop: stop, - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - d.cfg.InitDefaults() - - go d.gc() - - return d, nil -} - -func (s *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (s *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := s.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return data.(*kvv1.Item).Value, nil - } - return nil, nil -} - -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*kvv1.Item).Value - } - } - - return m, nil -} - -func (s *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - continue - } - // TTL is set - if items[i].Timeout != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - } - - s.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(*kvv1.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ - Key: items[i].Key, - Value: tmp.Value, - Timeout: items[i].Timeout, - }) - } - } - - return nil -} - -func (s *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*kvv1.Item).Timeout - } - } - return m, nil -} - -func (s *Driver) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - s.heap.Delete(keys[i]) - } - return nil -} - -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() - - return nil -} - -// ================================== PRIVATE ====================================== - -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) - for { - select { - case <-s.stop: - ticker.Stop() - return - case now := <-ticker.C: - // mutes needed to clear the map - s.clearMu.RLock() - - // check every second - s.heap.Range(func(key, value interface{}) bool { - v := value.(*kvv1.Item) - if v.Timeout == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.Timeout) - if err != nil { - return false - } - - if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) - } - return true - }) - - s.clearMu.RUnlock() - } - } -} diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go new file mode 100644 index 00000000..94abaadb --- /dev/null +++ b/plugins/memory/memoryjobs/consumer.go @@ -0,0 +1,269 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + prefetch string = "prefetch" + goroutinesMax uint64 = 1000 +) + +type Config struct { + Prefetch uint64 `mapstructure:"prefetch"` +} + +type consumer struct { + cfg *Config + log logger.Logger + eh events.Handler + pipeline atomic.Value + pq priorityqueue.Queue + localPrefetch chan *Item + + // time.sleep goroutines max number + goroutines uint64 + + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} +} + +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_ephemeral_pipeline") + + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) + + return jb, nil +} + +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + _, ok := c.pipeline.Load().(*pipeline.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } + + err := c.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), + }, nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + // stop the consumer + c.stopCh <- struct{}{} + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return + } + + // resume the consumer on the same channel + c.consume() + + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +// Run is no-op for the ephemeral +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + + return nil +} + +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + c.localPrefetch <- jj + + atomic.AddUint64(&c.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(c.active, 1) + + // insert to the local, limited pipeline + select { + case c.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) + } +} + +func (c *consumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-c.localPrefetch: + if !ok { + c.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed + + c.pq.Insert(item) + case <-c.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go new file mode 100644 index 00000000..8224c26b --- /dev/null +++ b/plugins/memory/memoryjobs/item.go @@ -0,0 +1,133 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Nack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + i.atomicallyReduceCount() + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + return nil +} + +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/memory/memorykv/config.go b/plugins/memory/memorykv/config.go new file mode 100644 index 00000000..a8a8993f --- /dev/null +++ b/plugins/memory/memorykv/config.go @@ -0,0 +1,14 @@ +package memorykv + +// Config is default config for the in-memory driver +type Config struct { + // Interval for the check + Interval int +} + +// InitDefaults by default driver is turned off +func (c *Config) InitDefaults() { + if c.Interval == 0 { + c.Interval = 60 // seconds + } +} diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go new file mode 100644 index 00000000..9b3e176c --- /dev/null +++ b/plugins/memory/memorykv/kv.go @@ -0,0 +1,253 @@ +package memorykv + +import ( + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + clearMu sync.RWMutex + heap sync.Map + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + log logger.Logger + cfg *Config +} + +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_in_memory_driver") + + d := &Driver{ + stop: make(chan struct{}), + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + + go d.gc() + + return d, nil +} + +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("in_memory_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if _, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = true + } + } + + return m, nil +} + +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("in_memory_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if data, exist := d.heap.Load(key); exist { + // here might be a panic + // but data only could be a string, see Set function + return data.(*kvv1.Item).Value, nil + } + return nil, nil +} + +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("in_memory_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + + for i := range keys { + if value, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = value.(*kvv1.Item).Value + } + } + + return m, nil +} + +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + continue + } + // TTL is set + if items[i].Timeout != "" { + // check the TTL in the item + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + } + + d.heap.Store(items[i].Key, items[i]) + } + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // if key exist, overwrite it value + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { + // check that time is correct + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + tmp := pItem.(*kvv1.Item) + // guess that t is in the future + // in memory is just FOR TESTING PURPOSES + // LOGIC ISN'T IDEAL + d.heap.Store(items[i].Key, &kvv1.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, + }) + } + } + + return nil +} + +func (d *Driver) TTL(keys ...string) (map[string]string, error) { + const op = errors.Op("in_memory_plugin_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]string, len(keys)) + + for i := range keys { + if item, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = item.(*kvv1.Item).Timeout + } + } + return m, nil +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("in_memory_plugin_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + d.heap.Delete(keys[i]) + } + return nil +} + +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() + + return nil +} + +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + +// ================================== PRIVATE ====================================== + +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() + for { + select { + case <-d.stop: + return + case now := <-ticker.C: + // mutes needed to clear the map + d.clearMu.RLock() + + // check every second + d.heap.Range(func(key, value interface{}) bool { + v := value.(*kvv1.Item) + if v.Timeout == "" { + return true + } + + t, err := time.Parse(time.RFC3339, v.Timeout) + if err != nil { + return false + } + + if now.After(t) { + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) + } + return true + }) + + d.clearMu.RUnlock() + } + } +} diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go new file mode 100644 index 00000000..75122571 --- /dev/null +++ b/plugins/memory/memorypubsub/pubsub.go @@ -0,0 +1,92 @@ +package memorypubsub + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type PubSubDriver struct { + sync.RWMutex + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + storage bst.Storage + log logger.Logger +} + +func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { + ps := &PubSubDriver{ + pushCh: make(chan *pubsub.Message, 10), + storage: bst.NewBST(), + log: log, + } + return ps, nil +} + +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg + return nil +} + +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { + go func() { + p.pushCh <- msg + }() +} + +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } + + return nil, nil +} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go deleted file mode 100644 index fd30eb54..00000000 --- a/plugins/memory/pubsub.go +++ /dev/null @@ -1,92 +0,0 @@ -package memory - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - // channel with the messages from the RPC - pushCh chan *pubsub.Message - // user-subscribed topics - storage bst.Storage - log logger.Logger -} - -func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { - ps := &PubSubDriver{ - pushCh: make(chan *pubsub.Message, 10), - storage: bst.NewBST(), - log: log, - } - return ps, nil -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.pushCh <- msg - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.pushCh <- msg - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("pubsub_memory") - select { - case msg := <-p.pushCh: - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil - } - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } - - return nil, nil -} diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go deleted file mode 100644 index d0a184d2..00000000 --- a/plugins/redis/clients.go +++ /dev/null @@ -1,84 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" -) - -// RedisClient return a client based on the provided section key -// key sample: kv.some-section.redis -// kv.redis -// redis (root) -func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { - const op = errors.Op("redis_get_client") - - if !p.cfgPlugin.Has(key) { - return nil, errors.E(op, errors.Errorf("no such section: %s", key)) - } - - cfg := &Config{} - - err := p.cfgPlugin.UnmarshalKey(key, cfg) - if err != nil { - return nil, errors.E(op, err) - } - - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc, nil -} - -func (p *Plugin) DefaultClient() redis.UniversalClient { - cfg := &Config{} - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc -} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go deleted file mode 100644 index 189b0002..00000000 --- a/plugins/redis/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redis - -import "github.com/go-redis/redis/v8" - -// Redis in the redis KV plugin interface -type Redis interface { - // RedisClient provides universal redis client - RedisClient(key string) (redis.UniversalClient, error) - - // DefaultClient provide default redis client based on redis defaults - DefaultClient() redis.UniversalClient -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index b41cb86c..3d062fbb 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -248,3 +248,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bf9f60cc..9813344e 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -45,17 +45,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 prefetch: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 prefetch: 10000 diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml deleted file mode 100644 index 726c24ac..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml deleted file mode 100644 index 8914dfaa..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml +++ /dev/null @@ -1,37 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: ephemeral - priority: 10 - prefetch: 10000 - - test-2: - driver: ephemeral - priority: 10 - prefetch: 10000 - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml deleted file mode 100644 index 05dc3ffa..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml deleted file mode 100644 index e1b76263..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml +++ /dev/null @@ -1,44 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2" ] - diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go deleted file mode 100644 index 2890aa9d..00000000 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ /dev/null @@ -1,571 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func TestEphemeralInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralPauseResume(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) - - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("ephemeralPause", pausePipelines("test-local")) - t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) - - stopCh <- struct{}{} - time.Sleep(time.Second) - wg.Wait() -} - -func TestEphemeralJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - - time.Sleep(time.Second) - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(1)) - assert.Equal(t, out.Delayed, int64(1)) - assert.Equal(t, out.Reserved, int64(0)) - - time.Sleep(time.Second) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(0)) - assert.Equal(t, out.Delayed, int64(0)) - assert.Equal(t, out.Reserved, int64(0)) - - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func declareEphemeralPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", - "name": "test-3", - "prefetch": "10000", - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} - -func consumeEphemeralPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-3" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 951d6227..5c521c2b 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -14,9 +14,9 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, &amqp.Plugin{}, ) assert.NoError(t, err) @@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) { &server.Plugin{}, &jobs.Plugin{}, &metrics.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, mockLogger, ) assert.NoError(t, err) @@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("DeclareEphemeralPipeline", declareMemoryPipe) + t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) t.Run("PushEphemeralPipeline", pushToPipe("test-3")) time.Sleep(time.Second) t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go new file mode 100644 index 00000000..20cbfb3f --- /dev/null +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -0,0 +1,571 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestMemoryInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryPauseResume(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-pause-resume.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("Resume", resumePipes("test-local")) + t.Run("Pause", pausePipelines("test-local")) + t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) + t.Run("Resume", resumePipes("test-local")) + t.Run("pushToEnabledPipe", pushToPipe("test-local")) + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + time.Sleep(time.Second) + wg.Wait() +} + +func TestMemoryJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) + + time.Sleep(time.Second) + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "memory") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(1)) + assert.Equal(t, out.Delayed, int64(1)) + assert.Equal(t, out.Reserved, int64(0)) + + time.Sleep(time.Second) + t.Run("ConsumePipeline", consumeMemoryPipe) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "memory") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(0)) + assert.Equal(t, out.Delayed, int64(0)) + assert.Equal(t, out.Reserved, int64(0)) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareMemoryPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "memory", + "name": "test-3", + "prefetch": "10000", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeMemoryPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/memory/.rr-memory-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml new file mode 100644 index 00000000..726c24ac --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml new file mode 100644 index 00000000..9ee8afc2 --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: memory + priority: 10 + prefetch: 10000 + + test-2: + driver: memory + priority: 10 + prefetch: 10000 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml new file mode 100644 index 00000000..05dc3ffa --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml new file mode 100644 index 00000000..1ad48237 --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: memory + priority: 10 + pipeline_size: 10000 + + test-local-2: + driver: memory + priority: 1 + pipeline_size: 10000 + + test-local-3: + driver: memory + priority: 2 + pipeline_size: 10000 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-local", "test-local-2" ] + diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml index f58de3e4..471e5c77 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml index 08b3bfad..b46bcb1c 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index a13b591c..6407c7ad 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: default: @@ -25,6 +28,3 @@ kv: memcached: driver: memcached addr: [ "127.0.0.1:11211" ] - -# redis: -# driver: redis diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go deleted file mode 100644 index 68da1394..00000000 --- a/tests/plugins/redis/plugin1.go +++ /dev/null @@ -1,45 +0,0 @@ -package redis - -import ( - "context" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis" -) - -type Plugin1 struct { - redisClient redis.UniversalClient -} - -func (p *Plugin1) Init(redis redisPlugin.Redis) error { - var err error - p.redisClient, err = redis.RedisClient("redis") - - return err -} - -func (p *Plugin1) Serve() chan error { - const op = errors.Op("plugin1 serve") - errCh := make(chan error, 1) - p.redisClient.Set(context.Background(), "foo", "bar", time.Minute) - - stringCmd := p.redisClient.Get(context.Background(), "foo") - data, err := stringCmd.Result() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - if data != "bar" { - errCh <- errors.E(op, errors.Str("no such key")) - return errCh - } - - return errCh -} - -func (p *Plugin1) Stop() error { - return nil -} diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go deleted file mode 100644 index 1b84e339..00000000 --- a/tests/plugins/redis/redis_plugin_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package redis - -import ( - "fmt" - "os" - "os/signal" - "sync" - "syscall" - "testing" - - "github.com/alicebob/miniredis/v2" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/redis" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func redisConfig(port string) string { - cfg := ` -redis: - addrs: - - '127.0.0.1:%s' - master_name: '' - username: '' - password: '' - db: 0 - sentinel_password: '' - route_by_latency: false - route_randomly: false - dial_timeout: 0 - max_retries: 1 - min_retry_backoff: 0 - max_retry_backoff: 0 - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 - read_timeout: 0 - write_timeout: 0 - pool_timeout: 0 - idle_timeout: 0 - idle_check_freq: 0 - read_only: false -` - return fmt.Sprintf(cfg, port) -} - -func TestRedisInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - s, err := miniredis.Run() - assert.NoError(t, err) - - c := redisConfig(s.Port()) - - cfg := &config.Viper{} - cfg.Type = "yaml" - cfg.ReadInCfg = []byte(c) - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - err = cont.RegisterAll( - cfg, - mockLogger, - &redis.Plugin{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - stopCh <- struct{}{} - wg.Wait() -} diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml index 5ab359d3..d256aad7 100644 --- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml +++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml @@ -1,3 +1,8 @@ logs: mode: development - level: error \ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml index 67d935e3..d6aaa7c6 100644 --- a/tests/plugins/rpc/configs/.rr.yaml +++ b/tests/plugins/rpc/configs/.rr.yaml @@ -1,5 +1,11 @@ rpc: listen: tcp://127.0.0.1:6001 + logs: mode: development - level: error \ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic -- cgit v1.2.3 From 850c68d581e198da1dad006674cc5157eaf9228b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 13:29:15 +0300 Subject: Update deps and CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 17 ++++++++++++----- go.mod | 2 -- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fcec90cb..97517d44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ CHANGELOG ========= -v2.4.0 (_.08.2021) +v2.4.0 (02.09.2021) ------------------- ## 💔 Internal BC: @@ -10,20 +10,27 @@ v2.4.0 (_.08.2021) ## 👀 New: -- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. - Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) +- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) - ✏️ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) - ✏️ Support for the Docker images via GitHub packages. -- ✏️ Go 1.17 support. +- ✏️ Go 1.17 support for the all spiral packages. ## 🩹 Fixes: - 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750). - 🐛 Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). +- 🐛 Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). + +## 📦 Packages: + +- 📦 Update goridge to `v3.2.1` +- 📦 Update temporal to `v1.0.9` +- 📦 Update endure to `v1.0.3` ## 📈 Summary: -- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29) +- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) +- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) --- diff --git a/go.mod b/go.mod index a8932d87..87fc7ce3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.17 require ( github.com/Shopify/toxiproxy v2.1.4+incompatible - github.com/alicebob/miniredis/v2 v2.15.1 // ========= AWS SDK v2 github.com/aws/aws-sdk-go-v2 v1.9.0 github.com/aws/aws-sdk-go-v2/config v1.7.0 @@ -47,7 +46,6 @@ require ( require ( github.com/StackExchange/wmi v1.2.1 // indirect - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/andybalholm/brotli v1.0.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect -- cgit v1.2.3 From 5ad241b23b64faf7389c424bdecd3489338fa1ba Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 13:48:40 +0300 Subject: Update Makefile Signed-off-by: Valery Piashchynski --- Makefile | 4 ++-- go.mod | 5 ++--- go.sum | 4 ++++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 1de45451..96df65cc 100755 --- a/Makefile +++ b/Makefile @@ -16,11 +16,11 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline diff --git a/go.mod b/go.mod index 87fc7ce3..f75b7702 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/klauspost/compress v1.13.5 github.com/prometheus/client_golang v1.11.0 github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 - github.com/shirou/gopsutil v3.21.7+incompatible + github.com/shirou/gopsutil v3.21.8+incompatible github.com/spf13/viper v1.8.1 // SPIRAL ==== github.com/spiral/endure v1.0.3 @@ -39,7 +39,7 @@ require ( go.uber.org/zap v1.19.0 golang.org/x/net v0.0.0-20210825183410-e898025ed96a golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf + golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -84,7 +84,6 @@ require ( github.com/valyala/fasthttp v1.29.0 // indirect github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.5 // indirect diff --git a/go.sum b/go.sum index 776d4212..a587942b 100644 --- a/go.sum +++ b/go.sum @@ -360,6 +360,8 @@ github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiA github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.21.7+incompatible h1:g/wcPHcuCQvHSePVofjQljd2vX4ty0+J6VoMB+NPcdk= github.com/shirou/gopsutil v3.21.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU= +github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -624,6 +626,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c= +golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -- cgit v1.2.3 From d62acca114a9646afed6ec0217b8cb709687aeb9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 16:50:41 +0300 Subject: Close connection in the amqp driver. bytes.Buffer update in the beanstalk driver Signed-off-by: Valery Piashchynski --- Makefile | 10 +++++----- plugins/amqp/amqpjobs/consumer.go | 6 +++--- plugins/amqp/amqpjobs/item.go | 7 ++++--- plugins/amqp/amqpjobs/redial.go | 21 +++++++++------------ plugins/beanstalk/consumer.go | 10 ++++++++-- plugins/beanstalk/item.go | 9 --------- plugins/jobs/job/job.go | 11 ----------- plugins/jobs/job/job_test.go | 27 --------------------------- plugins/jobs/plugin.go | 22 +++++++++++++++------- 9 files changed, 44 insertions(+), 79 deletions(-) diff --git a/Makefile b/Makefile index 96df65cc..8390e910 100755 --- a/Makefile +++ b/Makefile @@ -15,15 +15,15 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..1bfc4b41 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) { } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..dc2a7e91 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "encoding/gob" "strconv" "strings" "sync/atomic" @@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { bb := new(bytes.Buffer) bb.Grow(64) - err := item.pack(bb) + err := gob.NewEncoder(bb).Encode(item) if err != nil { return errors.E(op, err) } + body := make([]byte, bb.Len()) + copy(body, bb.Bytes()) + bb.Reset() + bb = nil + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 // is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; @@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { // seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..03060994 100644 --- a/plugins/beanstalk/item.go +++ b/plugins/beanstalk/item.go @@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item { } } -func (i *Item) pack(b *bytes.Buffer) error { - err := gob.NewEncoder(b).Encode(i) - if err != nil { - return err - } - - return nil -} - func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go index 06c3254e..adab2a0a 100644 --- a/plugins/jobs/job/job.go +++ b/plugins/jobs/job/job.go @@ -45,17 +45,6 @@ type Options struct { Delay int64 `json:"delay,omitempty"` } -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go index a47151a3..4a95e27d 100644 --- a/plugins/jobs/job/job_test.go +++ b/plugins/jobs/job/job_test.go @@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) { opts := &Options{Delay: 1} assert.Equal(t, time.Second, opts.DelayDuration()) } - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(2), opts.Delay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Delay) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..f411e9a0 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error { } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) + p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name()) return nil } @@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error { // delete consumer delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + d = nil + cancel() + return nil } func (p *Plugin) List() []string { -- cgit v1.2.3 From 1256173ed6d6aa1f3a347fed59b38bea39c8eef5 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 21:29:41 +0300 Subject: Remove redundand debug log Signed-off-by: Valery Piashchynski --- plugins/jobs/plugin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index f411e9a0..f61092a9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -615,7 +615,6 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) - p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name()) return nil } -- cgit v1.2.3 From 66f069f092568585e7b2a118303a20a598948fd7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 00:14:26 +0300 Subject: Update endure. Replace map with sync.Map because various RPC methods can be called simultaneously. Signed-off-by: Valery Piashchynski --- .golangci.yml | 1 - go.mod | 2 +- go.sum | 16 ++----------- plugins/jobs/plugin.go | 62 ++++++++++++++++++++++++++++---------------------- 4 files changed, 38 insertions(+), 43 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 55186659..f6ead63e 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -74,7 +74,6 @@ linters: # All available linters list: Date: Thu, 2 Sep 2021 16:52:54 +0300 Subject: Profiling session fixes: - Drain local pipeline channel - sync.Map instead of map - Add start-elapsed timings Signed-off-by: Valery Piashchynski --- plugins/amqp/amqpjobs/consumer.go | 16 +++-- plugins/beanstalk/consumer.go | 18 ++++-- plugins/boltdb/boltjobs/consumer.go | 16 +++-- plugins/memory/memoryjobs/consumer.go | 68 +++++++++++++--------- plugins/sqs/consumer.go | 16 +++-- .../jobs/beanstalk/.rr-beanstalk-jobs-err.yaml | 2 - 6 files changed, 88 insertions(+), 48 deletions(-) diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1bfc4b41..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,11 +420,13 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { + start := time.Now() c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -428,7 +435,8 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index dc2a7e91..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -266,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -282,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -328,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -357,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 46d596fa..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index 94abaadb..c2cc303b 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 + jb.cfg.Prefetch = 100 } // initialize a local queue @@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil + return &consumer{ + log: log, + pq: pq, + eh: eh, + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}), + }, nil } func (c *consumer) Push(ctx context.Context, jb *job.Job) error { @@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) { c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) } @@ -186,17 +185,26 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} + select { + case c.stopCh <- struct{}{}: + default: + break + } + + for i := 0; i < len(c.localPrefetch); i++ { + // drain all jobs from the channel + <-c.localPrefetch } c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -219,10 +227,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { time.Sleep(jj.Options.DelayDuration()) - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) + select { + case c.localPrefetch <- jj: + atomic.AddUint64(&c.goroutines, ^uint64(0)) + default: + c.log.Warn("can't push job", "error", "local queue closed or full") + } }(msg) return nil @@ -247,7 +257,7 @@ func (c *consumer) consume() { select { case item, ok := <-c.localPrefetch: if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue closed") return } diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index dfbda154..92dbd6a8 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("sqs_run") c.Lock() @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.pauseCh <- struct{}{} } @@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml index a4f31290..71b51dce 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -7,9 +7,7 @@ server: relay_timeout: "20s" beanstalk: - # beanstalk address addr: tcp://127.0.0.1:11300 - # connect timeout timeout: 10s logs: -- cgit v1.2.3 From c64005501f92888c10a61481745df91c7c50639f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 17:52:18 +0300 Subject: Destroy localPrefetch channel on stop Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 2 +- plugins/memory/memoryjobs/consumer.go | 6 ++++-- plugins/memory/memoryjobs/item.go | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 94f903d5..62186981 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -138,7 +138,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { - headers := map[string][]string{} + headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { headers[k] = v.GetValue() diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index c2cc303b..fbdedefe 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100 + jb.cfg.Prefetch = 100_000 } // initialize a local queue @@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand log: log, pq: pq, eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), @@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error { <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go index 8224c26b..f4d62ada 100644 --- a/plugins/memory/memoryjobs/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, -- cgit v1.2.3 From da78c9ed8ee7406aa2de21bc70642928c391852a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 19:04:16 +0300 Subject: Do not use copied driver from the map. Store driver and pipeline only after run w/o error. Signed-off-by: Valery Piashchynski --- plugins/jobs/plugin.go | 19 +++++++++---------- plugins/jobs/rpc.go | 6 +++--- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index eb273b93..3aec6acc 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -129,12 +129,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // config key for the particular sub-driver jobs.pipelines.test-local configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false @@ -595,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(pipeline.Name(), initializedDriver) - // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipeline) if err != nil { @@ -621,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { return errors.E(op, err) } } - } - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers.Store(pipeline.Name(), initializedDriver) + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + } return nil } diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 62186981..d7b93bd1 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { return errors.E(op, errors.Str("empty ID field not allowed")) } - err := r.p.Push(r.from(j.GetJob())) + err := r.p.Push(from(j.GetJob())) if err != nil { return errors.E(op, err) } @@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - batch[i] = r.from(j.GetJobs()[i]) + batch[i] = from(j.GetJobs()[i]) } err := r.p.PushBatch(batch) @@ -137,7 +137,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { } // from converts from transport entity to domain -func (r *rpc) from(j *jobsv1beta.Job) *job.Job { +func from(j *jobsv1beta.Job) *job.Job { headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { -- cgit v1.2.3 From 4524f8c5af045ed5048250b63b7859eaeb4f24a1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 19:36:05 +0300 Subject: Update CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97517d44..897877d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ v2.4.0 (02.09.2021) - 📦 Update goridge to `v3.2.1` - 📦 Update temporal to `v1.0.9` -- 📦 Update endure to `v1.0.3` +- 📦 Update endure to `v1.0.4` ## 📈 Summary: -- cgit v1.2.3