diff options
author | Valery Piashchynski <[email protected]> | 2020-02-20 01:02:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-02-20 01:02:12 +0300 |
commit | 54fb0dc2baa8d874f0ff090b49c80361bb6a9557 (patch) | |
tree | eb3b00d4aca057206977426cc02abd4f1f35fd17 | |
parent | 9cff24e4d515d684cefdf46624da90d22224aeaa (diff) |
Update wathcer implementation. Need to rethink structure and optimize
algorithms
-rw-r--r-- | service/reload/config.go | 8 | ||||
-rw-r--r-- | service/reload/service.go | 91 | ||||
-rw-r--r-- | service/reload/watcher.go | 458 |
3 files changed, 263 insertions, 294 deletions
diff --git a/service/reload/config.go b/service/reload/config.go index 338c6eba..fb704015 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -15,9 +15,11 @@ type Config struct { } type ServiceConfig struct { - // Watch is per-service specific files to watch - Watch []string - // Dirs is per-service specific dirs which will be combined with Watch + // Recursive is options to use nested files from root folder + Recursive bool + // Patterns is per-service specific files to watch + Patterns []string + // Dirs is per-service specific dirs which will be combined with Patterns Dirs []string // Ignore is set of files which would not be watched Ignore []string diff --git a/service/reload/service.go b/service/reload/service.go index 78f147b7..f50d6626 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -3,6 +3,7 @@ package reload import ( "github.com/spiral/roadrunner/service" "os" + "strings" "time" ) @@ -11,7 +12,8 @@ const ID = "reload" type Service struct { reloadConfig *Config - container service.Container + container service.Container + watcher *Watcher } // Init controller service @@ -23,61 +25,70 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } func (s *Service) Serve() error { - w, err := NewWatcher(SetMaxFileEvents(100)) - if err != nil { - return err - } - - name , err := os.Getwd() - if err != nil { - return err + if !s.reloadConfig.Enabled { + return nil } - err = w.AddSingle(name) + var err error + s.watcher, err = NewWatcher([]WatcherConfig{WatcherConfig{ + serviceName: "test", + recursive: false, + directories: []string{"/service"}, + filterHooks: func(filename, pattern string) error { + if strings.Contains(filename, pattern) { + return ErrorSkip + } + return nil + }, + files: make(map[string]os.FileInfo), + //ignored: []string{".php"}, + }}) if err != nil { return err } - go func() { - err = w.StartPolling(time.Second) - if err != nil { - } - }() + s.watcher.AddSingle("test", "/service") - // read events and restart corresponding services - - - for { - select { - case e := <- w.Event: - println(e.Name()) + go func() { + for { + select { + case e := <-s.watcher.Event: + println(e.Name()) + } } + //for e = range w.Event { + // + // println("event") + // // todo use status + // //svc, _ := s.container.Get("http") + // //if svc != nil { + // // if srv, ok := svc.(service.Service); ok { + // // srv.Stop() + // // err = srv.Serve() + // // if err != nil { + // // return err + // // } + // // } + // //} + // + // //println("event skipped due to service is nil") + //} + }() + + err = s.watcher.StartPolling(time.Second) + if err != nil { + return err } - //for e = range w.Event { - // - // println("event") - // // todo use status - // //svc, _ := s.container.Get("http") - // //if svc != nil { - // // if srv, ok := svc.(service.Service); ok { - // // srv.Stop() - // // err = srv.Serve() - // // if err != nil { - // // return err - // // } - // // } - // //} - // - // //println("event skipped due to service is nil") - //} + // read events and restart corresponding services return nil } func (s *Service) Stop() { + //s.watcher.Stop() -}
\ No newline at end of file +} diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 8d344b4c..5e57af32 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -4,34 +4,12 @@ import ( "errors" "io/ioutil" "os" + "path" "path/filepath" - "regexp" - "strings" "sync" "time" ) -// Config is a Reload configuration point. -//type Config struct { -// Enable or disable Reload extension, default disable. -//Enabled bool -// -// Watch is general pattern of files to watch. It will be applied to every directory in project -//Watch []string -// -// Services is set of services which would be reloaded in case of FS changes -//Services map[string]ServiceConfig -//} -// -//type ServiceConfig struct { -// Watch is per-service specific files to watch -//Watch []string -// Dirs is per-service specific dirs which will be combined with Watch -//Dirs []string -// Ignore is set of files which would not be watched -//Ignore []string -//} - // An Op is a type that is used to describe what type // of event has occurred during the watching process. type Op uint32 @@ -56,37 +34,10 @@ var ops = map[Op]string{ } var ErrorSkip = errors.New("file is skipped") +var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true") -// FilterFileHookFunc is a function that is called to filter files during listings. -// If a file is ok to be listed, nil is returned otherwise ErrSkip is returned. -type FilterFileHookFunc func(info os.FileInfo, fullPath string) error - -// RegexFilterHook is a function that accepts or rejects a file -// for listing based on whether it's filename or full path matches -// a regular expression. -func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc { - return func(info os.FileInfo, fullPath string) error { - str := info.Name() - - if useFullPath { - str = fullPath - } - - // Match - if r.MatchString(str) { - return nil - } - - // No match. - return ErrorSkip - } -} - -func SetFileHooks(fileHook ...FilterFileHookFunc) Options { - return func(watcher *Watcher) { - watcher.filterHooks = fileHook - } -} +// SimpleHook is used to filter by simple criteria, CONTAINS +type SimpleHook func(filename, pattern string) error // An Event describes an event that is received when files or directory // changes occur. It includes the os.FileInfo of the changed file or @@ -101,30 +52,48 @@ type Event struct { Type string // type of event, http, grpc, etc... } +type WatcherConfig struct { + // service name + serviceName string + // recursive or just add by singe directory + recursive bool + // directories used per-service + directories []string + // simple hook, just CONTAINS + filterHooks SimpleHook + // path to file with files + files map[string]os.FileInfo + // //ignored files or directories, used map for O(1) amortized get + ignored map[string]string +} + type Watcher struct { - Event chan Event + // main event channel + Event chan Event + errors chan error close chan struct{} Closed chan struct{} - + //============================= mu *sync.Mutex - //wg *sync.WaitGroup + wg *sync.WaitGroup - filterHooks []FilterFileHookFunc + // indicates is walker started or not + started bool + // working directory, same for all + workingDir string - started bool // indicates is walker started or not + // operation type + operations map[Op]struct{} // Op filtering. - workingDir string - maxFileWatchEvents int - operations map[Op]struct{} // Op filtering. - files map[string]os.FileInfo //files by service, http, grpc, etc.. - ignored map[string]string //ignored files or directories + // config for each service + watcherConfigs map[string]WatcherConfig } // Options is used to set Watcher Options type Options func(*Watcher) -func NewWatcher(options ...Options) (*Watcher, error) { +func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { dir, err := os.Getwd() if err != nil { return nil, err @@ -133,34 +102,47 @@ func NewWatcher(options ...Options) (*Watcher, error) { w := &Watcher{ Event: make(chan Event), mu: &sync.Mutex{}, - //wg: &sync.WaitGroup{}, - Closed: make(chan struct{}), - close: make(chan struct{}), - workingDir: dir, - operations: make(map[Op]struct{}), - files: make(map[string]os.FileInfo), - ignored: make(map[string]string), + wg: &sync.WaitGroup{}, + + Closed: make(chan struct{}), + close: make(chan struct{}), + + workingDir: dir, + operations: make(map[Op]struct{}), + watcherConfigs: make(map[string]WatcherConfig), + } + + // add watcherConfigs by service names + for _, v := range configs { + w.watcherConfigs[v.serviceName] = v } for _, option := range options { option(w) } - // dir --> /home/valery/Projects/opensource/roadrunner + if w.watcherConfigs == nil { + return nil, NoWalkerConfig + } + return w, nil } +func (w *Watcher) AddWatcherConfig(config WatcherConfig) { + w.watcherConfigs[config.serviceName] = config +} + // https://en.wikipedia.org/wiki/Inotify // SetMaxFileEvents sets max file notify events for Watcher // In case of file watch errors, this value can be increased system-wide // For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf) // Add apply: sudo sysctl -p --system -func SetMaxFileEvents(events int) Options { - return func(watcher *Watcher) { - watcher.maxFileWatchEvents = events - } - -} +//func SetMaxFileEvents(events int) Options { +// return func(watcher *Watcher) { +// watcher.maxFileWatchEvents = events +// } +// +//} // SetDefaultRootPath is used to set own root path for adding files func SetDefaultRootPath(path string) Options { @@ -171,63 +153,81 @@ func SetDefaultRootPath(path string) Options { // Add // name will be current working dir -func (w *Watcher) AddSingle(name string) error { - name, err := filepath.Abs(name) +func (w *Watcher) AddSingle(serviceName, relPath string) error { + absPath, err := filepath.Abs(w.workingDir) if err != nil { - + return err } + // full path is workdir/relative_path + fullPath := path.Join(absPath, relPath) + // Ignored files // map is to have O(1) when search for file - _, ignored := w.ignored[name] + _, ignored := w.watcherConfigs[serviceName].ignored[fullPath] if ignored { return nil } // small optimization for smallvector //fileList := make(map[string]os.FileInfo, 10) - fileList, err := w.retrieveSingleDirectoryContent(name) + fileList, err := w.retrieveFilesSingle(serviceName, fullPath) if err != nil { return err } - for k, v := range fileList { - w.files[k] = v + for fullPth, file := range fileList { + w.watcherConfigs[serviceName].files[fullPth] = file } return nil - } -func (w *Watcher) AddRecursive(name string) error { - name, err := filepath.Abs(name) +func (w *Watcher) AddRecursive(serviceName string, relPath string) error { + workDirAbs, err := filepath.Abs(w.workingDir) if err != nil { return err } - filesList := make(map[string]os.FileInfo, 10) + fullPath := path.Join(workDirAbs, relPath) + filesList := make(map[string]os.FileInfo, 100) - err = w.retrieveFilesRecursive(name, filesList) + err = w.retrieveFilesRecursive(serviceName, fullPath, filesList) if err != nil { return err } - for k, v := range filesList { - w.files[k] = v + for pathToFile, file := range filesList { + w.watcherConfigs[serviceName].files[pathToFile] = file + } + + return nil +} + +func (w *Watcher) AddIgnored(serviceName string, directories []string) error { + workDirAbs, err := filepath.Abs(w.workingDir) + if err != nil { + return err + } + + // concat wd with relative paths from config + // todo check path for existance + for _, v := range directories { + fullPath := path.Join(workDirAbs, v) + w.watcherConfigs[serviceName].ignored[fullPath] = fullPath } return nil } // pass map from outside -func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.FileInfo, error) { +func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { stat, err := os.Stat(path) if err != nil { return nil, err } filesList := make(map[string]os.FileInfo, 10) - filesList[path] = stat // if it's not a dir, return @@ -235,20 +235,6 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil return filesList, nil } - //err = filepath.Walk(name, func(path string, info os.FileInfo, err error) error { - // if info.IsDir() { - // return nil - // } - // - // fileList[path] = info - // - // return nil - //}) - // - //if err != nil { - // return err - //} - fileInfoList, err := ioutil.ReadDir(path) if err != nil { return nil, err @@ -259,29 +245,27 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil outer: for i := 0; i < len(fileInfoList); i++ { - var path string + var pathToFile string // BCE check elimination // https://go101.org/article/bounds-check-elimination.html if len(fileInfoList) != 0 && len(fileInfoList) >= i { - path = filepath.Join(path, fileInfoList[i].Name()) + pathToFile = filepath.Join(pathToFile, fileInfoList[i].Name()) } else { return nil, errors.New("file info list len") } // if file in ignored --> continue - if _, ignored := w.ignored[path]; ignored { + if _, ignored := w.watcherConfigs[serviceName].ignored[path]; ignored { continue } - for _, fh := range w.filterHooks { - err := fh(fileInfoList[i], path) - if err != nil { - // if err is not nil, move to the start of the cycle since the path not match the hook - continue outer - } + err := w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), pathToFile) + if err != nil { + // if err is not nil, move to the start of the cycle since the pathToFile not match the hook + continue outer } - filesList[path] = fileInfoList[i] + filesList[pathToFile] = fileInfoList[i] } @@ -307,6 +291,22 @@ func (w *Watcher) StartPolling(duration time.Duration) error { return w.waitEvent(duration) } +//func (w *Watcher) updatedFileListForConfig(config WatcherConfig) (map[string]os.FileInfo, error) { +// if config.recursive { +// return nil, nil +// } +// +// for _, v := range config.directories { +// files, err := w.retrieveFilesSingle(path.Join(w.workingDir, v)) +// if err != nil { +// return nil, err +// } +// +// } +// +// return nil, nil +//} + // this is blocking operation func (w *Watcher) waitEvent(d time.Duration) error { for { @@ -343,100 +343,40 @@ func (w *Watcher) waitEvent(d time.Duration) error { case <-ticker.C: //fileList := make(map[string]os.FileInfo, 100) //w.mu.Lock() - fileList, _ := w.retrieveFileList(w.workingDir, false) - w.pollEvents(fileList, cancel) + + for serviceName, config := range w.watcherConfigs { + fileList, _ := w.retrieveFileList(serviceName, config) + w.pollEvents(config.serviceName, fileList, cancel) + } + //w.mu.Unlock() default: } } - - ticker.Stop() - //inner: - // for { - // select { - // case <-w.close: - // close(cancel) - // close(w.Closed) - // return nil - // case event := <-evt: - // //if len(w.operations) > 0 { // Filter Ops. - // // _, found := w.operations[event.Op] - // // if !found { - // // continue - // // } - // //} - // //numEvents++ - // //if w.maxFileWatchEvents > 0 && numEvents > w.maxFileWatchEvents { - // // close(cancel) - // // break inner - // //} - // w.Event <- event - // case <-done: // Current cycle is finished. - // break inner - // } - // } - - //// Update the file's list. - //w.mu.Lock() - //w.files = fileList - //w.mu.Unlock() - - //time.Sleep(d) - //sleepLoop: - // for { - // select { - // case <-w.close: - // close(cancel) - // return nil - // case <-time.After(d): - // break sleepLoop - // } - // } //end Sleep for } } -func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.FileInfo, error) { - - //fileList := make(map[string]os.FileInfo) - - //list := make(map[string]os.FileInfo, 100) - //var err error - - if recursive { - //fileList, err := w.retrieveFilesRecursive(path) - //if err != nil { - //if os.IsNotExist(err) { - // w.mu.Unlock() - // // todo path error - // _, ok := err.(*os.PathError) - // if ok { - // w.RemoveRecursive(path) - // } - // w.mu.Lock() - //} else { - // w.errors <- err - //} - //} - - //for k, v := range fileList { - // fileList[k] = v - //} - //return fileList, nil - return nil, nil - } else { - fileList, err := w.retrieveSingleDirectoryContent(path) +func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + if config.recursive { + fileList := make(map[string]os.FileInfo) + err := w.retrieveFilesRecursive(serviceName, w.workingDir, fileList) if err != nil { - //if os.IsNotExist(err) { - // w.mu.Unlock() - // _, ok := err.(*os.PathError) - // if ok { - // w.RemoveRecursive(path) - // } - // w.mu.Lock() - //} else { - // w.errors <- err - //} + if os.IsNotExist(err) { + w.mu.Unlock() + // todo path error + _, ok := err.(*os.PathError) + if ok { + // todo + //err = w.RemoveRecursive(path) + if err != nil { + return nil, err + } + } + w.mu.Lock() + } else { + w.errors <- err + } } for k, v := range fileList { @@ -444,61 +384,81 @@ func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.F } return fileList, nil } - // Add the file's to the file list. - //return nil -} - -// RemoveRecursive removes either a single file or a directory recursively from -// the file's list. -func (w *Watcher) RemoveRecursive(name string) (err error) { - w.mu.Lock() - defer w.mu.Unlock() + fileList := make(map[string]os.FileInfo) + for _, dir := range config.directories { + absPath, err := filepath.Abs(w.workingDir) + if err != nil { + return nil, err + } - name, err = filepath.Abs(name) - if err != nil { - return err - } + // full path is workdir/relative_path + fullPath := path.Join(absPath, dir) - // If name is a single file, remove it and return. - info, found := w.files[name] - if !found { - return nil // Doesn't exist, just return. - } - if !info.IsDir() { - delete(w.files, name) - return nil - } + // list is pathToFiles with files + list, err := w.retrieveFilesSingle(serviceName, fullPath) - // If it's a directory, delete all of it's contents recursively - // from w.files. - for path := range w.files { - if strings.HasPrefix(path, name) { - delete(w.files, path) + for pathToFile, file := range list { + fileList[pathToFile] = file } } - return nil + + return fileList, nil + + // Add the file's to the file list. + + //return nil } -func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.FileInfo) error { - return filepath.Walk(name, func(path string, info os.FileInfo, err error) error { +// RemoveRecursive removes either a single file or a directory recursively from +// the file's list. +//func (w *Watcher) RemoveRecursive(name string) (err error) { +// w.mu.Lock() +// defer w.mu.Unlock() +// +// name, err = filepath.Abs(name) +// if err != nil { +// return err +// } +// +// // If name is a single file, remove it and return. +// info, found := w.files[name] +// if !found { +// return nil // Doesn't exist, just return. +// } +// if !info.IsDir() { +// delete(w.files, name) +// return nil +// } +// +// // If it's a directory, delete all of it's contents recursively +// // from w.files. +// for path := range w.files { +// if strings.HasPrefix(path, name) { +// delete(w.files, path) +// } +// } +// return nil +//} + +func (w *Watcher) retrieveFilesRecursive(serviceName, root string, fileList map[string]os.FileInfo) error { + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } - for _, f := range w.filterHooks { - err := f(info, path) - if err == ErrorSkip { - return nil - } - if err != nil { - return err - } + // filename, pattern + err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path) + if err == ErrorSkip { + return nil + } + if err != nil { + return err } // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. - _, ignored := w.ignored[path] + _, ignored := w.watcherConfigs[serviceName].ignored[path] if ignored { if info.IsDir() { @@ -512,12 +472,7 @@ func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.Fil }) } -// Wait blocks until the watcher is started. -//func (w *Watcher) Wait() { -// w.wg.Wait() -//} - -func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) { +func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, cancel chan struct{}) { w.mu.Lock() defer w.mu.Unlock() @@ -526,7 +481,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) removes := make(map[string]os.FileInfo) // Check for removed files. - for path, info := range w.files { + for path, info := range w.watcherConfigs[serviceName].files { if _, found := files[path]; !found { removes[path] = info } @@ -534,14 +489,14 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) // Check for created files, writes and chmods. for path, info := range files { - oldInfo, found := w.files[path] + oldInfo, found := w.watcherConfigs[serviceName].files[path] if !found { // A file was created. creates[path] = info continue } if oldInfo.ModTime() != info.ModTime() { - w.files[path] = info + w.watcherConfigs[serviceName].files[path] = info select { case <-cancel: return @@ -549,6 +504,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) } } if oldInfo.Mode() != info.Mode() { + w.watcherConfigs[serviceName].files[path] = info select { case <-cancel: return |