From 9c8da162b3347b632f33f85d56e8c1ff7014631a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 13:19:09 +0300 Subject: Code polishing before release Signed-off-by: Valery Piashchynski --- plugins/memory/config.go | 14 -- plugins/memory/kv.go | 249 ------------------------------- plugins/memory/memoryjobs/consumer.go | 269 ++++++++++++++++++++++++++++++++++ plugins/memory/memoryjobs/item.go | 133 +++++++++++++++++ plugins/memory/memorykv/config.go | 14 ++ plugins/memory/memorykv/kv.go | 253 ++++++++++++++++++++++++++++++++ plugins/memory/memorypubsub/pubsub.go | 92 ++++++++++++ plugins/memory/plugin.go | 50 ++++--- plugins/memory/pubsub.go | 92 ------------ 9 files changed, 789 insertions(+), 377 deletions(-) delete mode 100644 plugins/memory/config.go delete mode 100644 plugins/memory/kv.go create mode 100644 plugins/memory/memoryjobs/consumer.go create mode 100644 plugins/memory/memoryjobs/item.go create mode 100644 plugins/memory/memorykv/config.go create mode 100644 plugins/memory/memorykv/kv.go create mode 100644 plugins/memory/memorypubsub/pubsub.go delete mode 100644 plugins/memory/pubsub.go (limited to 'plugins/memory') diff --git a/plugins/memory/config.go b/plugins/memory/config.go deleted file mode 100644 index e51d09c5..00000000 --- a/plugins/memory/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package memory - -// Config is default config for the in-memory driver -type Config struct { - // Interval for the check - Interval int -} - -// InitDefaults by default driver is turned off -func (c *Config) InitDefaults() { - if c.Interval == 0 { - c.Interval = 60 // seconds - } -} diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go deleted file mode 100644 index 68ea7266..00000000 --- a/plugins/memory/kv.go +++ /dev/null @@ -1,249 +0,0 @@ -package memory - -import ( - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - clearMu sync.RWMutex - heap sync.Map - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - log logger.Logger - cfg *Config -} - -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { - const op = errors.Op("new_in_memory_driver") - - d := &Driver{ - stop: stop, - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - d.cfg.InitDefaults() - - go d.gc() - - return d, nil -} - -func (s *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (s *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := s.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return data.(*kvv1.Item).Value, nil - } - return nil, nil -} - -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*kvv1.Item).Value - } - } - - return m, nil -} - -func (s *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - continue - } - // TTL is set - if items[i].Timeout != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - } - - s.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(*kvv1.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ - Key: items[i].Key, - Value: tmp.Value, - Timeout: items[i].Timeout, - }) - } - } - - return nil -} - -func (s *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*kvv1.Item).Timeout - } - } - return m, nil -} - -func (s *Driver) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - s.heap.Delete(keys[i]) - } - return nil -} - -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() - - return nil -} - -// ================================== PRIVATE ====================================== - -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) - for { - select { - case <-s.stop: - ticker.Stop() - return - case now := <-ticker.C: - // mutes needed to clear the map - s.clearMu.RLock() - - // check every second - s.heap.Range(func(key, value interface{}) bool { - v := value.(*kvv1.Item) - if v.Timeout == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.Timeout) - if err != nil { - return false - } - - if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) - } - return true - }) - - s.clearMu.RUnlock() - } - } -} diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go new file mode 100644 index 00000000..94abaadb --- /dev/null +++ b/plugins/memory/memoryjobs/consumer.go @@ -0,0 +1,269 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + prefetch string = "prefetch" + goroutinesMax uint64 = 1000 +) + +type Config struct { + Prefetch uint64 `mapstructure:"prefetch"` +} + +type consumer struct { + cfg *Config + log logger.Logger + eh events.Handler + pipeline atomic.Value + pq priorityqueue.Queue + localPrefetch chan *Item + + // time.sleep goroutines max number + goroutines uint64 + + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} +} + +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_ephemeral_pipeline") + + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) + + return jb, nil +} + +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + _, ok := c.pipeline.Load().(*pipeline.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } + + err := c.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), + }, nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + // stop the consumer + c.stopCh <- struct{}{} + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return + } + + // resume the consumer on the same channel + c.consume() + + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +// Run is no-op for the ephemeral +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + + return nil +} + +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + c.localPrefetch <- jj + + atomic.AddUint64(&c.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(c.active, 1) + + // insert to the local, limited pipeline + select { + case c.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) + } +} + +func (c *consumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-c.localPrefetch: + if !ok { + c.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed + + c.pq.Insert(item) + case <-c.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go new file mode 100644 index 00000000..8224c26b --- /dev/null +++ b/plugins/memory/memoryjobs/item.go @@ -0,0 +1,133 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Nack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + i.atomicallyReduceCount() + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + return nil +} + +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/memory/memorykv/config.go b/plugins/memory/memorykv/config.go new file mode 100644 index 00000000..a8a8993f --- /dev/null +++ b/plugins/memory/memorykv/config.go @@ -0,0 +1,14 @@ +package memorykv + +// Config is default config for the in-memory driver +type Config struct { + // Interval for the check + Interval int +} + +// InitDefaults by default driver is turned off +func (c *Config) InitDefaults() { + if c.Interval == 0 { + c.Interval = 60 // seconds + } +} diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go new file mode 100644 index 00000000..9b3e176c --- /dev/null +++ b/plugins/memory/memorykv/kv.go @@ -0,0 +1,253 @@ +package memorykv + +import ( + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + clearMu sync.RWMutex + heap sync.Map + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + log logger.Logger + cfg *Config +} + +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_in_memory_driver") + + d := &Driver{ + stop: make(chan struct{}), + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + + go d.gc() + + return d, nil +} + +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("in_memory_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if _, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = true + } + } + + return m, nil +} + +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("in_memory_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if data, exist := d.heap.Load(key); exist { + // here might be a panic + // but data only could be a string, see Set function + return data.(*kvv1.Item).Value, nil + } + return nil, nil +} + +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("in_memory_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + + for i := range keys { + if value, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = value.(*kvv1.Item).Value + } + } + + return m, nil +} + +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + continue + } + // TTL is set + if items[i].Timeout != "" { + // check the TTL in the item + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + } + + d.heap.Store(items[i].Key, items[i]) + } + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // if key exist, overwrite it value + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { + // check that time is correct + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + tmp := pItem.(*kvv1.Item) + // guess that t is in the future + // in memory is just FOR TESTING PURPOSES + // LOGIC ISN'T IDEAL + d.heap.Store(items[i].Key, &kvv1.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, + }) + } + } + + return nil +} + +func (d *Driver) TTL(keys ...string) (map[string]string, error) { + const op = errors.Op("in_memory_plugin_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]string, len(keys)) + + for i := range keys { + if item, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = item.(*kvv1.Item).Timeout + } + } + return m, nil +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("in_memory_plugin_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + d.heap.Delete(keys[i]) + } + return nil +} + +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() + + return nil +} + +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + +// ================================== PRIVATE ====================================== + +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() + for { + select { + case <-d.stop: + return + case now := <-ticker.C: + // mutes needed to clear the map + d.clearMu.RLock() + + // check every second + d.heap.Range(func(key, value interface{}) bool { + v := value.(*kvv1.Item) + if v.Timeout == "" { + return true + } + + t, err := time.Parse(time.RFC3339, v.Timeout) + if err != nil { + return false + } + + if now.After(t) { + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) + } + return true + }) + + d.clearMu.RUnlock() + } + } +} diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go new file mode 100644 index 00000000..75122571 --- /dev/null +++ b/plugins/memory/memorypubsub/pubsub.go @@ -0,0 +1,92 @@ +package memorypubsub + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type PubSubDriver struct { + sync.RWMutex + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + storage bst.Storage + log logger.Logger +} + +func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { + ps := &PubSubDriver{ + pushCh: make(chan *pubsub.Message, 10), + storage: bst.NewBST(), + log: log, + } + return ps, nil +} + +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg + return nil +} + +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { + go func() { + p.pushCh <- msg + }() +} + +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } + + return nil, nil +} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go deleted file mode 100644 index fd30eb54..00000000 --- a/plugins/memory/pubsub.go +++ /dev/null @@ -1,92 +0,0 @@ -package memory - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - // channel with the messages from the RPC - pushCh chan *pubsub.Message - // user-subscribed topics - storage bst.Storage - log logger.Logger -} - -func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { - ps := &PubSubDriver{ - pushCh: make(chan *pubsub.Message, 10), - storage: bst.NewBST(), - log: log, - } - return ps, nil -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.pushCh <- msg - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.pushCh <- msg - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("pubsub_memory") - select { - case msg := <-p.pushCh: - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil - } - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } - - return nil, nil -} -- cgit v1.2.3 From 74c327a86e48ccc9d58833fce994ea134169d0a9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 16:52:54 +0300 Subject: Profiling session fixes: - Drain local pipeline channel - sync.Map instead of map - Add start-elapsed timings Signed-off-by: Valery Piashchynski --- plugins/memory/memoryjobs/consumer.go | 68 ++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 29 deletions(-) (limited to 'plugins/memory') diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index 94abaadb..c2cc303b 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 + jb.cfg.Prefetch = 100 } // initialize a local queue @@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil + return &consumer{ + log: log, + pq: pq, + eh: eh, + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}), + }, nil } func (c *consumer) Push(ctx context.Context, jb *job.Job) error { @@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) { c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) } @@ -186,17 +185,26 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} + select { + case c.stopCh <- struct{}{}: + default: + break + } + + for i := 0; i < len(c.localPrefetch); i++ { + // drain all jobs from the channel + <-c.localPrefetch } c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -219,10 +227,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { time.Sleep(jj.Options.DelayDuration()) - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) + select { + case c.localPrefetch <- jj: + atomic.AddUint64(&c.goroutines, ^uint64(0)) + default: + c.log.Warn("can't push job", "error", "local queue closed or full") + } }(msg) return nil @@ -247,7 +257,7 @@ func (c *consumer) consume() { select { case item, ok := <-c.localPrefetch: if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue closed") return } -- cgit v1.2.3 From c64005501f92888c10a61481745df91c7c50639f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 17:52:18 +0300 Subject: Destroy localPrefetch channel on stop Signed-off-by: Valery Piashchynski --- plugins/memory/memoryjobs/consumer.go | 6 ++++-- plugins/memory/memoryjobs/item.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'plugins/memory') diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index c2cc303b..fbdedefe 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100 + jb.cfg.Prefetch = 100_000 } // initialize a local queue @@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand log: log, pq: pq, eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), @@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error { <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go index 8224c26b..f4d62ada 100644 --- a/plugins/memory/memoryjobs/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, -- cgit v1.2.3