diff options
Diffstat (limited to 'plugins/reload')
-rw-r--r-- | plugins/reload/config.go | 58 | ||||
-rw-r--r-- | plugins/reload/plugin.go | 159 | ||||
-rw-r--r-- | plugins/reload/watcher.go | 374 |
3 files changed, 591 insertions, 0 deletions
diff --git a/plugins/reload/config.go b/plugins/reload/config.go new file mode 100644 index 00000000..9ca2c0dc --- /dev/null +++ b/plugins/reload/config.go @@ -0,0 +1,58 @@ +package reload + +import ( + "time" + + "github.com/spiral/errors" +) + +// Config is a Reload configuration point. +type Config struct { + // Interval is a global refresh interval + Interval time.Duration + + // Patterns is a global file patterns to watch. It will be applied to every directory in project + Patterns []string + + // Services is set of services which would be reloaded in case of FS changes + Services map[string]ServiceConfig +} + +type ServiceConfig struct { + // Enabled indicates that service must be watched, doest not required when any other option specified + Enabled bool + + // Recursive is options to use nested files from root folder + Recursive bool + + // Patterns is per-service specific files to watch + Patterns []string + + // Dirs is per-service specific dirs which will be combined with Patterns + Dirs []string + + // Ignore is set of files which would not be watched + Ignore []string +} + +// InitDefaults sets missing values to their default values. +func InitDefaults(c *Config) { + c.Interval = time.Second + c.Patterns = []string{".php"} +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + const op = errors.Op("config validation [reload plugin]") + if c.Interval < time.Second { + return errors.E(op, errors.Str("too short interval")) + } + + if c.Services == nil { + return errors.E(op, errors.Str("should add at least 1 service")) + } else if len(c.Services) == 0 { + return errors.E(op, errors.Str("service initialized, however, no config added")) + } + + return nil +} diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go new file mode 100644 index 00000000..452e03a3 --- /dev/null +++ b/plugins/reload/plugin.go @@ -0,0 +1,159 @@ +package reload + +import ( + "os" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" +) + +// PluginName contains default plugin name. +const PluginName string = "reload" +const thresholdChanBuffer uint = 1000 + +type Plugin struct { + cfg *Config + log logger.Logger + watcher *Watcher + services map[string]interface{} + res resetter.Resetter + stopc chan struct{} +} + +// Init controller service +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error { + const op = errors.Op("reload plugin init") + s.cfg = &Config{} + InitDefaults(s.cfg) + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + // disable plugin in case of error + return errors.E(op, errors.Disabled, err) + } + + s.log = log + s.res = res + s.stopc = make(chan struct{}, 1) + s.services = make(map[string]interface{}) + + var configs []WatcherConfig + + for serviceName, serviceConfig := range s.cfg.Services { + ignored, err := ConvertIgnored(serviceConfig.Ignore) + if err != nil { + return errors.E(op, err) + } + configs = append(configs, WatcherConfig{ + ServiceName: serviceName, + Recursive: serviceConfig.Recursive, + Directories: serviceConfig.Dirs, + FilterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return errors.E(op, errors.Skip) + }, + Files: make(map[string]os.FileInfo), + Ignored: ignored, + FilePatterns: append(serviceConfig.Patterns, s.cfg.Patterns...), + }) + } + + s.watcher, err = NewWatcher(configs, s.log) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (s *Plugin) Serve() chan error { + const op = errors.Op("reload plugin serve") + errCh := make(chan error, 1) + if s.cfg.Interval < time.Second { + errCh <- errors.E(op, errors.Str("reload interval is too fast")) + return errCh + } + + // make a map with unique services + // so, if we would have a 100 events from http service + // in map we would see only 1 key and it's config + treshholdc := make(chan struct { + serviceConfig ServiceConfig + service string + }, thresholdChanBuffer) + + // use the same interval + timer := time.NewTimer(s.cfg.Interval) + + go func() { + for e := range s.watcher.Event { + treshholdc <- struct { + serviceConfig ServiceConfig + service string + }{serviceConfig: s.cfg.Services[e.service], service: e.service} + } + }() + + // map with configs by services + updated := make(map[string]ServiceConfig, len(s.cfg.Services)) + + go func() { + for { + select { + case cfg := <-treshholdc: + // logic is following: + // restart + timer.Stop() + // replace previous value in map by more recent without adding new one + updated[cfg.service] = cfg.serviceConfig + // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) + // instead, we are resetting the timer and wait for s.cfg.Interval time + // If there is no more events, we restart service only once + timer.Reset(s.cfg.Interval) + case <-timer.C: + if len(updated) > 0 { + for name := range updated { + err := s.res.ResetByName(name) + if err != nil { + timer.Stop() + errCh <- errors.E(op, err) + return + } + } + // zero map + updated = make(map[string]ServiceConfig, len(s.cfg.Services)) + } + case <-s.stopc: + timer.Stop() + return + } + } + }() + + go func() { + err := s.watcher.StartPolling(s.cfg.Interval) + if err != nil { + errCh <- errors.E(op, err) + return + } + }() + + return errCh +} + +func (s *Plugin) Stop() error { + s.watcher.Stop() + s.stopc <- struct{}{} + return nil +} + +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go new file mode 100644 index 00000000..c232f16f --- /dev/null +++ b/plugins/reload/watcher.go @@ -0,0 +1,374 @@ +package reload + +import ( + "io/ioutil" + "os" + "path/filepath" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// SimpleHook is used to filter by simple criteria, CONTAINS +type SimpleHook func(filename string, pattern []string) error + +// An Event describes an event that is received when files or directory +// changes occur. It includes the os.FileInfo of the changed file or +// directory and the type of event that's occurred and the full path of the file. +type Event struct { + Path string + Info os.FileInfo + + service string // type of service, http, grpc, etc... +} + +type WatcherConfig struct { + // service name + ServiceName string + + // Recursive or just add by singe directory + Recursive bool + + // Directories used per-service + Directories []string + + // simple hook, just CONTAINS + FilterHooks func(filename string, pattern []string) error + + // path to file with Files + Files map[string]os.FileInfo + + // Ignored Directories, used map for O(1) amortized get + Ignored map[string]struct{} + + // FilePatterns to ignore + FilePatterns []string +} + +type Watcher struct { + // main event channel + Event chan Event + close chan struct{} + + // ============================= + mu *sync.Mutex + + // indicates is walker started or not + started bool + + // config for each service + // need pointer here to assign files + watcherConfigs map[string]WatcherConfig + + // logger + log logger.Logger +} + +// Options is used to set Watcher Options +type Options func(*Watcher) + +// NewWatcher returns new instance of File Watcher +func NewWatcher(configs []WatcherConfig, log logger.Logger, options ...Options) (*Watcher, error) { + w := &Watcher{ + Event: make(chan Event), + mu: &sync.Mutex{}, + + log: log, + + close: make(chan struct{}), + + //workingDir: workDir, + watcherConfigs: make(map[string]WatcherConfig), + } + + // add watcherConfigs by service names + for _, v := range configs { + w.watcherConfigs[v.ServiceName] = v + } + + // apply options + for _, option := range options { + option(w) + } + err := w.initFs() + if err != nil { + return nil, err + } + + return w, nil +} + +// initFs makes initial map with files +func (w *Watcher) initFs() error { + const op = errors.Op("init fs") + for srvName, config := range w.watcherConfigs { + fileList, err := w.retrieveFileList(srvName, config) + if err != nil { + return errors.E(op, err) + } + // workaround. in golang you can't assign to map in struct field + tmp := w.watcherConfigs[srvName] + tmp.Files = fileList + w.watcherConfigs[srvName] = tmp + } + return nil +} + +// ConvertIgnored is used to convert slice to map with ignored files +func ConvertIgnored(ignored []string) (map[string]struct{}, error) { + if len(ignored) == 0 { + return nil, nil + } + + ign := make(map[string]struct{}, len(ignored)) + for i := 0; i < len(ignored); i++ { + abs, err := filepath.Abs(ignored[i]) + if err != nil { + return nil, err + } + ign[abs] = struct{}{} + } + + return ign, nil +} + +// https://en.wikipedia.org/wiki/Inotify +// SetMaxFileEvents sets max file notify events for Watcher +// In case of file watch errors, this value can be increased system-wide +// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf) +// Add apply: sudo sysctl -p --system +// func SetMaxFileEvents(events int) Options { +// return func(watcher *Watcher) { +// watcher.maxFileWatchEvents = events +// } +// +// } + +// pass map from outside +func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { + const op = errors.Op("retrieve") + stat, err := os.Stat(path) + if err != nil { + return nil, err + } + + filesList := make(map[string]os.FileInfo, 10) + filesList[path] = stat + + // if it's not a dir, return + if !stat.IsDir() { + return filesList, nil + } + + fileInfoList, err := ioutil.ReadDir(path) + if err != nil { + return nil, err + } + + // recursive calls are slow in compare to goto + // so, we will add files with goto pattern +outer: + for i := 0; i < len(fileInfoList); i++ { + // if file in ignored --> continue + if _, ignored := w.watcherConfigs[serviceName].Ignored[path]; ignored { + continue + } + + // if filename does not contain pattern --> ignore that file + if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil { + err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns) + if errors.Is(errors.Skip, err) { + continue outer + } + } + + filesList[fileInfoList[i].Name()] = fileInfoList[i] + } + + return filesList, nil +} + +func (w *Watcher) StartPolling(duration time.Duration) error { + w.mu.Lock() + const op = errors.Op("start polling") + if w.started { + w.mu.Unlock() + return errors.E(op, errors.Str("already started")) + } + + w.started = true + w.mu.Unlock() + + return w.waitEvent(duration) +} + +// this is blocking operation +func (w *Watcher) waitEvent(d time.Duration) error { + ticker := time.NewTicker(d) + for { + select { + case <-w.close: + ticker.Stop() + // just exit + // no matter for the pollEvents + return nil + case <-ticker.C: + // this is not very effective way + // because we have to wait on Lock + // better is to listen files in parallel, but, since that would be used in debug... TODO + for serviceName := range w.watcherConfigs { + // TODO sync approach + fileList, _ := w.retrieveFileList(serviceName, w.watcherConfigs[serviceName]) + w.pollEvents(w.watcherConfigs[serviceName].ServiceName, fileList) + } + } + } +} + +// retrieveFileList get file list for service +func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + fileList := make(map[string]os.FileInfo) + if config.Recursive { + // walk through directories recursively + for i := 0; i < len(config.Directories); i++ { + // full path is workdir/relative_path + fullPath, err := filepath.Abs(config.Directories[i]) + if err != nil { + return nil, err + } + list, err := w.retrieveFilesRecursive(serviceName, fullPath) + if err != nil { + return nil, err + } + + for k := range list { + fileList[k] = list[k] + } + } + return fileList, nil + } + + for i := 0; i < len(config.Directories); i++ { + // full path is workdir/relative_path + fullPath, err := filepath.Abs(config.Directories[i]) + if err != nil { + return nil, err + } + + // list is pathToFiles with files + list, err := w.retrieveFilesSingle(serviceName, fullPath) + if err != nil { + return nil, err + } + + for pathToFile, file := range list { + fileList[pathToFile] = file + } + } + + return fileList, nil +} + +func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) { + fileList := make(map[string]os.FileInfo) + + return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + const op = errors.Op("retrieve files recursive") + if err != nil { + return errors.E(op, err) + } + + // If path is ignored and it's a directory, skip the directory. If it's + // ignored and it's a single file, skip the file. + _, ignored := w.watcherConfigs[serviceName].Ignored[path] + if ignored { + if info.IsDir() { + // if it's dir, ignore whole + return filepath.SkipDir + } + return nil + } + + // if filename does not contain pattern --> ignore that file + err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns) + if errors.Is(errors.Skip, err) { + return nil + } + + // Add the path and it's info to the file list. + fileList[path] = info + return nil + }) +} + +func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { + w.mu.Lock() + defer w.mu.Unlock() + + // Store create and remove events for use to check for rename events. + creates := make(map[string]os.FileInfo) + removes := make(map[string]os.FileInfo) + + // Check for removed files. + for pth := range w.watcherConfigs[serviceName].Files { + if _, found := files[pth]; !found { + removes[pth] = w.watcherConfigs[serviceName].Files[pth] + w.log.Debug("file added to the list of removed files", "path", pth, "name", w.watcherConfigs[serviceName].Files[pth].Name(), "size", w.watcherConfigs[serviceName].Files[pth].Size()) + } + } + + // Check for created files, writes and chmods. + for pth := range files { + if files[pth].IsDir() { + continue + } + oldInfo, found := w.watcherConfigs[serviceName].Files[pth] + if !found { + // A file was created. + creates[pth] = files[pth] + w.log.Debug("file was created", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) + continue + } + + if oldInfo.ModTime() != files[pth].ModTime() || oldInfo.Mode() != files[pth].Mode() { + w.watcherConfigs[serviceName].Files[pth] = files[pth] + w.log.Debug("file was updated", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) + w.Event <- Event{ + Path: pth, + Info: files[pth], + service: serviceName, + } + } + } + + // Send all the remaining create and remove events. + for pth := range creates { + // add file to the plugin watch files + w.watcherConfigs[serviceName].Files[pth] = creates[pth] + w.log.Debug("file was added to watcher", "path", pth, "name", creates[pth].Name(), "size", creates[pth].Size()) + + w.Event <- Event{ + Path: pth, + Info: creates[pth], + service: serviceName, + } + } + + for pth := range removes { + // delete path from the config + delete(w.watcherConfigs[serviceName].Files, pth) + w.log.Debug("file was removed from watcher", "path", pth, "name", removes[pth].Name(), "size", removes[pth].Size()) + + w.Event <- Event{ + Path: pth, + Info: removes[pth], + service: serviceName, + } + } +} + +func (w *Watcher) Stop() { + w.close <- struct{}{} +} |