summaryrefslogtreecommitdiff
path: root/plugins/reload
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-26 00:47:21 +0300
committerGitHub <[email protected]>2020-12-26 00:47:21 +0300
commit566d7f4c95eb5dedcb2da5afcda4bbea8eba077f (patch)
tree0007a6b8c8ac9e7d31b8a5f3f7f27669c860d261 /plugins/reload
parent1bc3db2ea9b95edd0101676d7bfd75df3782c3bd (diff)
parent7a0dee1a416705c621edbf50e1f43fb39845348f (diff)
Merge pull request #463 from spiral/experiment/core_pluginsv2.0.0-beta1
[RR2] Plugins
Diffstat (limited to 'plugins/reload')
-rw-r--r--plugins/reload/config.go58
-rw-r--r--plugins/reload/plugin.go159
-rw-r--r--plugins/reload/watcher.go374
3 files changed, 591 insertions, 0 deletions
diff --git a/plugins/reload/config.go b/plugins/reload/config.go
new file mode 100644
index 00000000..9ca2c0dc
--- /dev/null
+++ b/plugins/reload/config.go
@@ -0,0 +1,58 @@
+package reload
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+)
+
+// Config is a Reload configuration point.
+type Config struct {
+ // Interval is a global refresh interval
+ Interval time.Duration
+
+ // Patterns is a global file patterns to watch. It will be applied to every directory in project
+ Patterns []string
+
+ // Services is set of services which would be reloaded in case of FS changes
+ Services map[string]ServiceConfig
+}
+
+type ServiceConfig struct {
+ // Enabled indicates that service must be watched, doest not required when any other option specified
+ Enabled bool
+
+ // Recursive is options to use nested files from root folder
+ Recursive bool
+
+ // Patterns is per-service specific files to watch
+ Patterns []string
+
+ // Dirs is per-service specific dirs which will be combined with Patterns
+ Dirs []string
+
+ // Ignore is set of files which would not be watched
+ Ignore []string
+}
+
+// InitDefaults sets missing values to their default values.
+func InitDefaults(c *Config) {
+ c.Interval = time.Second
+ c.Patterns = []string{".php"}
+}
+
+// Valid validates the configuration.
+func (c *Config) Valid() error {
+ const op = errors.Op("config validation [reload plugin]")
+ if c.Interval < time.Second {
+ return errors.E(op, errors.Str("too short interval"))
+ }
+
+ if c.Services == nil {
+ return errors.E(op, errors.Str("should add at least 1 service"))
+ } else if len(c.Services) == 0 {
+ return errors.E(op, errors.Str("service initialized, however, no config added"))
+ }
+
+ return nil
+}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
new file mode 100644
index 00000000..452e03a3
--- /dev/null
+++ b/plugins/reload/plugin.go
@@ -0,0 +1,159 @@
+package reload
+
+import (
+ "os"
+ "strings"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/resetter"
+)
+
+// PluginName contains default plugin name.
+const PluginName string = "reload"
+const thresholdChanBuffer uint = 1000
+
+type Plugin struct {
+ cfg *Config
+ log logger.Logger
+ watcher *Watcher
+ services map[string]interface{}
+ res resetter.Resetter
+ stopc chan struct{}
+}
+
+// Init controller service
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error {
+ const op = errors.Op("reload plugin init")
+ s.cfg = &Config{}
+ InitDefaults(s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ // disable plugin in case of error
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ s.log = log
+ s.res = res
+ s.stopc = make(chan struct{}, 1)
+ s.services = make(map[string]interface{})
+
+ var configs []WatcherConfig
+
+ for serviceName, serviceConfig := range s.cfg.Services {
+ ignored, err := ConvertIgnored(serviceConfig.Ignore)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ configs = append(configs, WatcherConfig{
+ ServiceName: serviceName,
+ Recursive: serviceConfig.Recursive,
+ Directories: serviceConfig.Dirs,
+ FilterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return errors.E(op, errors.Skip)
+ },
+ Files: make(map[string]os.FileInfo),
+ Ignored: ignored,
+ FilePatterns: append(serviceConfig.Patterns, s.cfg.Patterns...),
+ })
+ }
+
+ s.watcher, err = NewWatcher(configs, s.log)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ const op = errors.Op("reload plugin serve")
+ errCh := make(chan error, 1)
+ if s.cfg.Interval < time.Second {
+ errCh <- errors.E(op, errors.Str("reload interval is too fast"))
+ return errCh
+ }
+
+ // make a map with unique services
+ // so, if we would have a 100 events from http service
+ // in map we would see only 1 key and it's config
+ treshholdc := make(chan struct {
+ serviceConfig ServiceConfig
+ service string
+ }, thresholdChanBuffer)
+
+ // use the same interval
+ timer := time.NewTimer(s.cfg.Interval)
+
+ go func() {
+ for e := range s.watcher.Event {
+ treshholdc <- struct {
+ serviceConfig ServiceConfig
+ service string
+ }{serviceConfig: s.cfg.Services[e.service], service: e.service}
+ }
+ }()
+
+ // map with configs by services
+ updated := make(map[string]ServiceConfig, len(s.cfg.Services))
+
+ go func() {
+ for {
+ select {
+ case cfg := <-treshholdc:
+ // logic is following:
+ // restart
+ timer.Stop()
+ // replace previous value in map by more recent without adding new one
+ updated[cfg.service] = cfg.serviceConfig
+ // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing)
+ // instead, we are resetting the timer and wait for s.cfg.Interval time
+ // If there is no more events, we restart service only once
+ timer.Reset(s.cfg.Interval)
+ case <-timer.C:
+ if len(updated) > 0 {
+ for name := range updated {
+ err := s.res.ResetByName(name)
+ if err != nil {
+ timer.Stop()
+ errCh <- errors.E(op, err)
+ return
+ }
+ }
+ // zero map
+ updated = make(map[string]ServiceConfig, len(s.cfg.Services))
+ }
+ case <-s.stopc:
+ timer.Stop()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ err := s.watcher.StartPolling(s.cfg.Interval)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return
+ }
+ }()
+
+ return errCh
+}
+
+func (s *Plugin) Stop() error {
+ s.watcher.Stop()
+ s.stopc <- struct{}{}
+ return nil
+}
+
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go
new file mode 100644
index 00000000..c232f16f
--- /dev/null
+++ b/plugins/reload/watcher.go
@@ -0,0 +1,374 @@
+package reload
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// SimpleHook is used to filter by simple criteria, CONTAINS
+type SimpleHook func(filename string, pattern []string) error
+
+// An Event describes an event that is received when files or directory
+// changes occur. It includes the os.FileInfo of the changed file or
+// directory and the type of event that's occurred and the full path of the file.
+type Event struct {
+ Path string
+ Info os.FileInfo
+
+ service string // type of service, http, grpc, etc...
+}
+
+type WatcherConfig struct {
+ // service name
+ ServiceName string
+
+ // Recursive or just add by singe directory
+ Recursive bool
+
+ // Directories used per-service
+ Directories []string
+
+ // simple hook, just CONTAINS
+ FilterHooks func(filename string, pattern []string) error
+
+ // path to file with Files
+ Files map[string]os.FileInfo
+
+ // Ignored Directories, used map for O(1) amortized get
+ Ignored map[string]struct{}
+
+ // FilePatterns to ignore
+ FilePatterns []string
+}
+
+type Watcher struct {
+ // main event channel
+ Event chan Event
+ close chan struct{}
+
+ // =============================
+ mu *sync.Mutex
+
+ // indicates is walker started or not
+ started bool
+
+ // config for each service
+ // need pointer here to assign files
+ watcherConfigs map[string]WatcherConfig
+
+ // logger
+ log logger.Logger
+}
+
+// Options is used to set Watcher Options
+type Options func(*Watcher)
+
+// NewWatcher returns new instance of File Watcher
+func NewWatcher(configs []WatcherConfig, log logger.Logger, options ...Options) (*Watcher, error) {
+ w := &Watcher{
+ Event: make(chan Event),
+ mu: &sync.Mutex{},
+
+ log: log,
+
+ close: make(chan struct{}),
+
+ //workingDir: workDir,
+ watcherConfigs: make(map[string]WatcherConfig),
+ }
+
+ // add watcherConfigs by service names
+ for _, v := range configs {
+ w.watcherConfigs[v.ServiceName] = v
+ }
+
+ // apply options
+ for _, option := range options {
+ option(w)
+ }
+ err := w.initFs()
+ if err != nil {
+ return nil, err
+ }
+
+ return w, nil
+}
+
+// initFs makes initial map with files
+func (w *Watcher) initFs() error {
+ const op = errors.Op("init fs")
+ for srvName, config := range w.watcherConfigs {
+ fileList, err := w.retrieveFileList(srvName, config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // workaround. in golang you can't assign to map in struct field
+ tmp := w.watcherConfigs[srvName]
+ tmp.Files = fileList
+ w.watcherConfigs[srvName] = tmp
+ }
+ return nil
+}
+
+// ConvertIgnored is used to convert slice to map with ignored files
+func ConvertIgnored(ignored []string) (map[string]struct{}, error) {
+ if len(ignored) == 0 {
+ return nil, nil
+ }
+
+ ign := make(map[string]struct{}, len(ignored))
+ for i := 0; i < len(ignored); i++ {
+ abs, err := filepath.Abs(ignored[i])
+ if err != nil {
+ return nil, err
+ }
+ ign[abs] = struct{}{}
+ }
+
+ return ign, nil
+}
+
+// https://en.wikipedia.org/wiki/Inotify
+// SetMaxFileEvents sets max file notify events for Watcher
+// In case of file watch errors, this value can be increased system-wide
+// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf)
+// Add apply: sudo sysctl -p --system
+// func SetMaxFileEvents(events int) Options {
+// return func(watcher *Watcher) {
+// watcher.maxFileWatchEvents = events
+// }
+//
+// }
+
+// pass map from outside
+func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) {
+ const op = errors.Op("retrieve")
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+
+ filesList := make(map[string]os.FileInfo, 10)
+ filesList[path] = stat
+
+ // if it's not a dir, return
+ if !stat.IsDir() {
+ return filesList, nil
+ }
+
+ fileInfoList, err := ioutil.ReadDir(path)
+ if err != nil {
+ return nil, err
+ }
+
+ // recursive calls are slow in compare to goto
+ // so, we will add files with goto pattern
+outer:
+ for i := 0; i < len(fileInfoList); i++ {
+ // if file in ignored --> continue
+ if _, ignored := w.watcherConfigs[serviceName].Ignored[path]; ignored {
+ continue
+ }
+
+ // if filename does not contain pattern --> ignore that file
+ if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil {
+ err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns)
+ if errors.Is(errors.Skip, err) {
+ continue outer
+ }
+ }
+
+ filesList[fileInfoList[i].Name()] = fileInfoList[i]
+ }
+
+ return filesList, nil
+}
+
+func (w *Watcher) StartPolling(duration time.Duration) error {
+ w.mu.Lock()
+ const op = errors.Op("start polling")
+ if w.started {
+ w.mu.Unlock()
+ return errors.E(op, errors.Str("already started"))
+ }
+
+ w.started = true
+ w.mu.Unlock()
+
+ return w.waitEvent(duration)
+}
+
+// this is blocking operation
+func (w *Watcher) waitEvent(d time.Duration) error {
+ ticker := time.NewTicker(d)
+ for {
+ select {
+ case <-w.close:
+ ticker.Stop()
+ // just exit
+ // no matter for the pollEvents
+ return nil
+ case <-ticker.C:
+ // this is not very effective way
+ // because we have to wait on Lock
+ // better is to listen files in parallel, but, since that would be used in debug... TODO
+ for serviceName := range w.watcherConfigs {
+ // TODO sync approach
+ fileList, _ := w.retrieveFileList(serviceName, w.watcherConfigs[serviceName])
+ w.pollEvents(w.watcherConfigs[serviceName].ServiceName, fileList)
+ }
+ }
+ }
+}
+
+// retrieveFileList get file list for service
+func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) {
+ fileList := make(map[string]os.FileInfo)
+ if config.Recursive {
+ // walk through directories recursively
+ for i := 0; i < len(config.Directories); i++ {
+ // full path is workdir/relative_path
+ fullPath, err := filepath.Abs(config.Directories[i])
+ if err != nil {
+ return nil, err
+ }
+ list, err := w.retrieveFilesRecursive(serviceName, fullPath)
+ if err != nil {
+ return nil, err
+ }
+
+ for k := range list {
+ fileList[k] = list[k]
+ }
+ }
+ return fileList, nil
+ }
+
+ for i := 0; i < len(config.Directories); i++ {
+ // full path is workdir/relative_path
+ fullPath, err := filepath.Abs(config.Directories[i])
+ if err != nil {
+ return nil, err
+ }
+
+ // list is pathToFiles with files
+ list, err := w.retrieveFilesSingle(serviceName, fullPath)
+ if err != nil {
+ return nil, err
+ }
+
+ for pathToFile, file := range list {
+ fileList[pathToFile] = file
+ }
+ }
+
+ return fileList, nil
+}
+
+func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) {
+ fileList := make(map[string]os.FileInfo)
+
+ return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ const op = errors.Op("retrieve files recursive")
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // If path is ignored and it's a directory, skip the directory. If it's
+ // ignored and it's a single file, skip the file.
+ _, ignored := w.watcherConfigs[serviceName].Ignored[path]
+ if ignored {
+ if info.IsDir() {
+ // if it's dir, ignore whole
+ return filepath.SkipDir
+ }
+ return nil
+ }
+
+ // if filename does not contain pattern --> ignore that file
+ err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns)
+ if errors.Is(errors.Skip, err) {
+ return nil
+ }
+
+ // Add the path and it's info to the file list.
+ fileList[path] = info
+ return nil
+ })
+}
+
+func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Store create and remove events for use to check for rename events.
+ creates := make(map[string]os.FileInfo)
+ removes := make(map[string]os.FileInfo)
+
+ // Check for removed files.
+ for pth := range w.watcherConfigs[serviceName].Files {
+ if _, found := files[pth]; !found {
+ removes[pth] = w.watcherConfigs[serviceName].Files[pth]
+ w.log.Debug("file added to the list of removed files", "path", pth, "name", w.watcherConfigs[serviceName].Files[pth].Name(), "size", w.watcherConfigs[serviceName].Files[pth].Size())
+ }
+ }
+
+ // Check for created files, writes and chmods.
+ for pth := range files {
+ if files[pth].IsDir() {
+ continue
+ }
+ oldInfo, found := w.watcherConfigs[serviceName].Files[pth]
+ if !found {
+ // A file was created.
+ creates[pth] = files[pth]
+ w.log.Debug("file was created", "path", pth, "name", files[pth].Name(), "size", files[pth].Size())
+ continue
+ }
+
+ if oldInfo.ModTime() != files[pth].ModTime() || oldInfo.Mode() != files[pth].Mode() {
+ w.watcherConfigs[serviceName].Files[pth] = files[pth]
+ w.log.Debug("file was updated", "path", pth, "name", files[pth].Name(), "size", files[pth].Size())
+ w.Event <- Event{
+ Path: pth,
+ Info: files[pth],
+ service: serviceName,
+ }
+ }
+ }
+
+ // Send all the remaining create and remove events.
+ for pth := range creates {
+ // add file to the plugin watch files
+ w.watcherConfigs[serviceName].Files[pth] = creates[pth]
+ w.log.Debug("file was added to watcher", "path", pth, "name", creates[pth].Name(), "size", creates[pth].Size())
+
+ w.Event <- Event{
+ Path: pth,
+ Info: creates[pth],
+ service: serviceName,
+ }
+ }
+
+ for pth := range removes {
+ // delete path from the config
+ delete(w.watcherConfigs[serviceName].Files, pth)
+ w.log.Debug("file was removed from watcher", "path", pth, "name", removes[pth].Name(), "size", removes[pth].Size())
+
+ w.Event <- Event{
+ Path: pth,
+ Info: removes[pth],
+ service: serviceName,
+ }
+ }
+}
+
+func (w *Watcher) Stop() {
+ w.close <- struct{}{}
+}