diff options
Diffstat (limited to 'plugins/reload')
-rw-r--r-- | plugins/reload/config.go | 62 | ||||
-rw-r--r-- | plugins/reload/plugin.go | 167 | ||||
-rw-r--r-- | plugins/reload/watcher.go | 372 |
3 files changed, 0 insertions, 601 deletions
diff --git a/plugins/reload/config.go b/plugins/reload/config.go deleted file mode 100644 index 6fd3af70..00000000 --- a/plugins/reload/config.go +++ /dev/null @@ -1,62 +0,0 @@ -package reload - -import ( - "time" - - "github.com/spiral/errors" -) - -// Config is a Reload configuration point. -type Config struct { - // Interval is a global refresh interval - Interval time.Duration - - // Patterns is a global file patterns to watch. It will be applied to every directory in project - Patterns []string - - // Services is set of services which would be reloaded in case of FS changes - Services map[string]ServiceConfig -} - -type ServiceConfig struct { - // Enabled indicates that service must be watched, doest not required when any other option specified - Enabled bool - - // Recursive is options to use nested files from root folder - Recursive bool - - // Patterns is per-service specific files to watch - Patterns []string - - // Dirs is per-service specific dirs which will be combined with Patterns - Dirs []string - - // Ignore is set of files which would not be watched - Ignore []string -} - -// InitDefaults sets missing values to their default values. -func (c *Config) InitDefaults() { - if c.Interval == 0 { - c.Interval = time.Second - } - if c.Patterns == nil { - c.Patterns = []string{".php"} - } -} - -// Valid validates the configuration. -func (c *Config) Valid() error { - const op = errors.Op("reload_plugin_valid") - if c.Interval < time.Second { - return errors.E(op, errors.Str("too short interval")) - } - - if c.Services == nil { - return errors.E(op, errors.Str("should add at least 1 service")) - } else if len(c.Services) == 0 { - return errors.E(op, errors.Str("service initialized, however, no config added")) - } - - return nil -} diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go deleted file mode 100644 index a9a5a63c..00000000 --- a/plugins/reload/plugin.go +++ /dev/null @@ -1,167 +0,0 @@ -package reload - -import ( - "os" - "strings" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" -) - -// PluginName contains default plugin name. -const PluginName string = "reload" -const thresholdChanBuffer uint = 1000 - -type Plugin struct { - cfg *Config - log logger.Logger - watcher *Watcher - services map[string]interface{} - res *resetter.Plugin - stopc chan struct{} -} - -// Init controller service -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res *resetter.Plugin) error { - const op = errors.Op("reload_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - // disable plugin in case of error - return errors.E(op, errors.Disabled, err) - } - - s.cfg.InitDefaults() - - s.log = log - s.res = res - s.stopc = make(chan struct{}, 1) - s.services = make(map[string]interface{}) - - configs := make([]WatcherConfig, 0, len(s.cfg.Services)) - - for serviceName, serviceConfig := range s.cfg.Services { - ignored, errIgn := ConvertIgnored(serviceConfig.Ignore) - if errIgn != nil { - return errors.E(op, err) - } - configs = append(configs, WatcherConfig{ - ServiceName: serviceName, - Recursive: serviceConfig.Recursive, - Directories: serviceConfig.Dirs, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(op, errors.SkipFile) - }, - Files: make(map[string]os.FileInfo), - Ignored: ignored, - FilePatterns: append(serviceConfig.Patterns, s.cfg.Patterns...), - }) - } - - s.watcher, err = NewWatcher(configs, s.log) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (s *Plugin) Serve() chan error { - const op = errors.Op("reload_plugin_serve") - errCh := make(chan error, 1) - if s.cfg.Interval < time.Second { - errCh <- errors.E(op, errors.Str("reload interval is too fast")) - return errCh - } - - // make a map with unique services - // so, if we would have 100 events from http service - // in map we would see only 1 key, and it's config - thCh := make(chan struct { - serviceConfig ServiceConfig - service string - }, thresholdChanBuffer) - - // use the same interval - timer := time.NewTimer(s.cfg.Interval) - - go func() { - for e := range s.watcher.Event { - thCh <- struct { - serviceConfig ServiceConfig - service string - }{serviceConfig: s.cfg.Services[e.service], service: e.service} - } - }() - - // map with config by services - updated := make(map[string]ServiceConfig, len(s.cfg.Services)) - - go func() { - for { - select { - case cfg := <-thCh: - // logic is following: - // restart - timer.Stop() - // replace previous value in map by more recent without adding new one - updated[cfg.service] = cfg.serviceConfig - // if we are getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) - // instead, we are resetting the timer and wait for s.cfg.Interval time - // If there is no more events, we restart service only once - timer.Reset(s.cfg.Interval) - case <-timer.C: - if len(updated) > 0 { - for name := range updated { - err := s.res.Reset(name) - if err != nil { - timer.Stop() - errCh <- errors.E(op, err) - return - } - } - // zero map - updated = make(map[string]ServiceConfig, len(s.cfg.Services)) - } - case <-s.stopc: - timer.Stop() - return - } - } - }() - - go func() { - err := s.watcher.StartPolling(s.cfg.Interval) - if err != nil { - errCh <- errors.E(op, err) - return - } - }() - - return errCh -} - -func (s *Plugin) Stop() error { - s.watcher.Stop() - s.stopc <- struct{}{} - return nil -} - -func (s *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (s *Plugin) Available() { -} diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go deleted file mode 100644 index c40c2fdf..00000000 --- a/plugins/reload/watcher.go +++ /dev/null @@ -1,372 +0,0 @@ -package reload - -import ( - "io/ioutil" - "os" - "path/filepath" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// SimpleHook is used to filter by simple criteria, CONTAINS -type SimpleHook func(filename string, pattern []string) error - -// An Event describes an event that is received when files or directory -// changes occur. It includes the os.FileInfo of the changed file or -// directory and the type of event that's occurred and the full path of the file. -type Event struct { - Path string - Info os.FileInfo - - service string // type of service, http, grpc, etc... -} - -type WatcherConfig struct { - // service name - ServiceName string - - // Recursive or just add by singe directory - Recursive bool - - // Directories used per-service - Directories []string - - // simple hook, just CONTAINS - FilterHooks func(filename string, pattern []string) error - - // path to file with Files - Files map[string]os.FileInfo - - // Ignored Directories, used map for O(1) amortized get - Ignored map[string]struct{} - - // FilePatterns to ignore - FilePatterns []string -} - -type Watcher struct { - // main event channel - Event chan Event - close chan struct{} - - // ============================= - mu *sync.Mutex - - // indicates is walker started or not - started bool - - // config for each service - // need pointer here to assign files - watcherConfigs map[string]WatcherConfig - - // logger - log logger.Logger -} - -// Options is used to set Watcher Options -type Options func(*Watcher) - -// NewWatcher returns new instance of File Watcher -func NewWatcher(configs []WatcherConfig, log logger.Logger, options ...Options) (*Watcher, error) { - w := &Watcher{ - Event: make(chan Event), - mu: &sync.Mutex{}, - - log: log, - - close: make(chan struct{}), - - //workingDir: workDir, - watcherConfigs: make(map[string]WatcherConfig), - } - - // add watcherConfigs by service names - for _, v := range configs { - w.watcherConfigs[v.ServiceName] = v - } - - // apply options - for _, option := range options { - option(w) - } - err := w.initFs() - if err != nil { - return nil, err - } - - return w, nil -} - -// initFs makes initial map with files -func (w *Watcher) initFs() error { - const op = errors.Op("watcher_init_fs") - for srvName, config := range w.watcherConfigs { - fileList, err := w.retrieveFileList(srvName, config) - if err != nil { - return errors.E(op, err) - } - // workaround. in golang you can't assign to map in struct field - tmp := w.watcherConfigs[srvName] - tmp.Files = fileList - w.watcherConfigs[srvName] = tmp - } - return nil -} - -// ConvertIgnored is used to convert slice to map with ignored files -func ConvertIgnored(ignored []string) (map[string]struct{}, error) { - if len(ignored) == 0 { - return nil, nil - } - - ign := make(map[string]struct{}, len(ignored)) - for i := 0; i < len(ignored); i++ { - abs, err := filepath.Abs(ignored[i]) - if err != nil { - return nil, err - } - ign[abs] = struct{}{} - } - - return ign, nil -} - -// https://en.wikipedia.org/wiki/Inotify -// SetMaxFileEvents sets max file notify events for Watcher -// In case of file watch errors, this value can be increased system-wide -// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf) -// Add apply: sudo sysctl -p --system -// func SetMaxFileEvents(events int) Options { -// return func(watcher *Watcher) { -// watcher.maxFileWatchEvents = events -// } -// -// } - -// pass map from outside -func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { - stat, err := os.Stat(path) - if err != nil { - return nil, err - } - - filesList := make(map[string]os.FileInfo, 10) - filesList[path] = stat - - // if it's not a dir, return - if !stat.IsDir() { - return filesList, nil - } - - fileInfoList, err := ioutil.ReadDir(path) - if err != nil { - return nil, err - } - - // recursive calls are slow in compare to goto - // so, we will add files with goto pattern -outer: - for i := 0; i < len(fileInfoList); i++ { - // if file in ignored --> continue - if _, ignored := w.watcherConfigs[serviceName].Ignored[path]; ignored { - continue - } - - // if filename does not contain pattern --> ignore that file - if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil { - err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns) - if errors.Is(errors.SkipFile, err) { - continue outer - } - } - - filesList[fileInfoList[i].Name()] = fileInfoList[i] - } - - return filesList, nil -} - -func (w *Watcher) StartPolling(duration time.Duration) error { - w.mu.Lock() - const op = errors.Op("watcher_start_polling") - if w.started { - w.mu.Unlock() - return errors.E(op, errors.Str("already started")) - } - - w.started = true - w.mu.Unlock() - - return w.waitEvent(duration) -} - -// this is blocking operation -func (w *Watcher) waitEvent(d time.Duration) error { - ticker := time.NewTicker(d) - for { - select { - case <-w.close: - ticker.Stop() - // just exit - // no matter for the pollEvents - return nil - case <-ticker.C: - // this is not very effective way - // because we have to wait on Lock - // better is to listen files in parallel, but, since that would be used in debug... - for serviceName := range w.watcherConfigs { - fileList, _ := w.retrieveFileList(serviceName, w.watcherConfigs[serviceName]) - w.pollEvents(w.watcherConfigs[serviceName].ServiceName, fileList) - } - } - } -} - -// retrieveFileList get file list for service -func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { - fileList := make(map[string]os.FileInfo) - if config.Recursive { - // walk through directories recursively - for i := 0; i < len(config.Directories); i++ { - // full path is workdir/relative_path - fullPath, err := filepath.Abs(config.Directories[i]) - if err != nil { - return nil, err - } - list, err := w.retrieveFilesRecursive(serviceName, fullPath) - if err != nil { - return nil, err - } - - for k := range list { - fileList[k] = list[k] - } - } - return fileList, nil - } - - for i := 0; i < len(config.Directories); i++ { - // full path is workdir/relative_path - fullPath, err := filepath.Abs(config.Directories[i]) - if err != nil { - return nil, err - } - - // list is pathToFiles with files - list, err := w.retrieveFilesSingle(serviceName, fullPath) - if err != nil { - return nil, err - } - - for pathToFile, file := range list { - fileList[pathToFile] = file - } - } - - return fileList, nil -} - -func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) { - fileList := make(map[string]os.FileInfo) - - return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - const op = errors.Op("retrieve files recursive") - if err != nil { - return errors.E(op, err) - } - - // If path is ignored and it's a directory, skip the directory. If it's - // ignored and it's a single file, skip the file. - _, ignored := w.watcherConfigs[serviceName].Ignored[path] - if ignored { - if info.IsDir() { - // if it's dir, ignore whole - return filepath.SkipDir - } - return nil - } - - // if filename does not contain pattern --> ignore that file - err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns) - if errors.Is(errors.SkipFile, err) { - return nil - } - - // Add the path and it's info to the file list. - fileList[path] = info - return nil - }) -} - -func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { - w.mu.Lock() - defer w.mu.Unlock() - - // InsertMany create and remove events for use to check for rename events. - creates := make(map[string]os.FileInfo) - removes := make(map[string]os.FileInfo) - - // Check for removed files. - for pth := range w.watcherConfigs[serviceName].Files { - if _, found := files[pth]; !found { - removes[pth] = w.watcherConfigs[serviceName].Files[pth] - w.log.Debug("file added to the list of removed files", "path", pth, "name", w.watcherConfigs[serviceName].Files[pth].Name(), "size", w.watcherConfigs[serviceName].Files[pth].Size()) - } - } - - // Check for created files, writes and chmods. - for pth := range files { - if files[pth].IsDir() { - continue - } - oldInfo, found := w.watcherConfigs[serviceName].Files[pth] - if !found { - // A file was created. - creates[pth] = files[pth] - w.log.Debug("file was created", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) - continue - } - - if oldInfo.ModTime() != files[pth].ModTime() || oldInfo.Mode() != files[pth].Mode() { - w.watcherConfigs[serviceName].Files[pth] = files[pth] - w.log.Debug("file was updated", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) - w.Event <- Event{ - Path: pth, - Info: files[pth], - service: serviceName, - } - } - } - - // Send all the remaining create and remove events. - for pth := range creates { - // add file to the plugin watch files - w.watcherConfigs[serviceName].Files[pth] = creates[pth] - w.log.Debug("file was added to watcher", "path", pth, "name", creates[pth].Name(), "size", creates[pth].Size()) - - w.Event <- Event{ - Path: pth, - Info: creates[pth], - service: serviceName, - } - } - - for pth := range removes { - // delete path from the config - delete(w.watcherConfigs[serviceName].Files, pth) - w.log.Debug("file was removed from watcher", "path", pth, "name", removes[pth].Name(), "size", removes[pth].Size()) - - w.Event <- Event{ - Path: pth, - Info: removes[pth], - service: serviceName, - } - } -} - -func (w *Watcher) Stop() { - w.close <- struct{}{} -} |