diff options
author | Valery Piashchynski <[email protected]> | 2020-12-26 00:47:21 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-12-26 00:47:21 +0300 |
commit | 566d7f4c95eb5dedcb2da5afcda4bbea8eba077f (patch) | |
tree | 0007a6b8c8ac9e7d31b8a5f3f7f27669c860d261 /plugins/reload/plugin.go | |
parent | 1bc3db2ea9b95edd0101676d7bfd75df3782c3bd (diff) | |
parent | 7a0dee1a416705c621edbf50e1f43fb39845348f (diff) |
Merge pull request #463 from spiral/experiment/core_pluginsv2.0.0-beta1
[RR2] Plugins
Diffstat (limited to 'plugins/reload/plugin.go')
-rw-r--r-- | plugins/reload/plugin.go | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go new file mode 100644 index 00000000..452e03a3 --- /dev/null +++ b/plugins/reload/plugin.go @@ -0,0 +1,159 @@ +package reload + +import ( + "os" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" +) + +// PluginName contains default plugin name. +const PluginName string = "reload" +const thresholdChanBuffer uint = 1000 + +type Plugin struct { + cfg *Config + log logger.Logger + watcher *Watcher + services map[string]interface{} + res resetter.Resetter + stopc chan struct{} +} + +// Init controller service +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error { + const op = errors.Op("reload plugin init") + s.cfg = &Config{} + InitDefaults(s.cfg) + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + // disable plugin in case of error + return errors.E(op, errors.Disabled, err) + } + + s.log = log + s.res = res + s.stopc = make(chan struct{}, 1) + s.services = make(map[string]interface{}) + + var configs []WatcherConfig + + for serviceName, serviceConfig := range s.cfg.Services { + ignored, err := ConvertIgnored(serviceConfig.Ignore) + if err != nil { + return errors.E(op, err) + } + configs = append(configs, WatcherConfig{ + ServiceName: serviceName, + Recursive: serviceConfig.Recursive, + Directories: serviceConfig.Dirs, + FilterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return errors.E(op, errors.Skip) + }, + Files: make(map[string]os.FileInfo), + Ignored: ignored, + FilePatterns: append(serviceConfig.Patterns, s.cfg.Patterns...), + }) + } + + s.watcher, err = NewWatcher(configs, s.log) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (s *Plugin) Serve() chan error { + const op = errors.Op("reload plugin serve") + errCh := make(chan error, 1) + if s.cfg.Interval < time.Second { + errCh <- errors.E(op, errors.Str("reload interval is too fast")) + return errCh + } + + // make a map with unique services + // so, if we would have a 100 events from http service + // in map we would see only 1 key and it's config + treshholdc := make(chan struct { + serviceConfig ServiceConfig + service string + }, thresholdChanBuffer) + + // use the same interval + timer := time.NewTimer(s.cfg.Interval) + + go func() { + for e := range s.watcher.Event { + treshholdc <- struct { + serviceConfig ServiceConfig + service string + }{serviceConfig: s.cfg.Services[e.service], service: e.service} + } + }() + + // map with configs by services + updated := make(map[string]ServiceConfig, len(s.cfg.Services)) + + go func() { + for { + select { + case cfg := <-treshholdc: + // logic is following: + // restart + timer.Stop() + // replace previous value in map by more recent without adding new one + updated[cfg.service] = cfg.serviceConfig + // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) + // instead, we are resetting the timer and wait for s.cfg.Interval time + // If there is no more events, we restart service only once + timer.Reset(s.cfg.Interval) + case <-timer.C: + if len(updated) > 0 { + for name := range updated { + err := s.res.ResetByName(name) + if err != nil { + timer.Stop() + errCh <- errors.E(op, err) + return + } + } + // zero map + updated = make(map[string]ServiceConfig, len(s.cfg.Services)) + } + case <-s.stopc: + timer.Stop() + return + } + } + }() + + go func() { + err := s.watcher.StartPolling(s.cfg.Interval) + if err != nil { + errCh <- errors.E(op, err) + return + } + }() + + return errCh +} + +func (s *Plugin) Stop() error { + s.watcher.Stop() + s.stopc <- struct{}{} + return nil +} + +func (s *Plugin) Name() string { + return PluginName +} |