From 50b46c8d3c0e1f13623e2cd7cbb1302ae66ed308 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 17 Feb 2020 19:13:15 +0300 Subject: Add reload service Start to implement Watcher --- service/reload/config.go | 41 +++++++ service/reload/config_test.go | 1 + service/reload/service.go | 40 +++++++ service/reload/service_test.go | 1 + service/reload/watcher.go | 235 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 318 insertions(+) create mode 100644 service/reload/config.go create mode 100644 service/reload/config_test.go create mode 100644 service/reload/service.go create mode 100644 service/reload/service_test.go create mode 100644 service/reload/watcher.go (limited to 'service') diff --git a/service/reload/config.go b/service/reload/config.go new file mode 100644 index 00000000..338c6eba --- /dev/null +++ b/service/reload/config.go @@ -0,0 +1,41 @@ +package reload + +import "github.com/spiral/roadrunner/service" + +// Config is a Reload configuration point. +type Config struct { + // Enable or disable Reload extension, default disable. + Enabled bool + + // Watch is general pattern of files to watch. It will be applied to every directory in project + Watch []string + + // Services is set of services which would be reloaded in case of FS changes + Services map[string]ServiceConfig +} + +type ServiceConfig struct { + // Watch is per-service specific files to watch + Watch []string + // Dirs is per-service specific dirs which will be combined with Watch + Dirs []string + // Ignore is set of files which would not be watched + Ignore []string +} + + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + return nil +} + +// InitDefaults sets missing values to their default values. +func (c *Config) InitDefaults() error { + //c.Interval = time.Second + + return nil +} + diff --git a/service/reload/config_test.go b/service/reload/config_test.go new file mode 100644 index 00000000..7cad4a5d --- /dev/null +++ b/service/reload/config_test.go @@ -0,0 +1 @@ +package reload diff --git a/service/reload/service.go b/service/reload/service.go new file mode 100644 index 00000000..db10b6f4 --- /dev/null +++ b/service/reload/service.go @@ -0,0 +1,40 @@ +package reload + +import "github.com/spiral/roadrunner/service" + +// ID contains default service name. +const ID = "reload" + +type Service struct { + reloadConfig *Config +} + +// Init controller service +func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { + // mount Services to designated services + //for id, watcher := range cfg.Controllers(s.throw) { + // svc, _ := c.Get(id) + // if ctrl, ok := svc.(controllable); ok { + // ctrl.Attach(watcher) + // } + //} + + s.reloadConfig = cfg + + return true, nil +} + +func (s *Service) Serve() error { + w, err := NewWatcher(s.reloadConfig, SetMaxFileEvents(100)) + if err != nil { + return err + } + + _ = w + + return nil +} + +func (s *Service) Stop() { + +} \ No newline at end of file diff --git a/service/reload/service_test.go b/service/reload/service_test.go new file mode 100644 index 00000000..7cad4a5d --- /dev/null +++ b/service/reload/service_test.go @@ -0,0 +1 @@ +package reload diff --git a/service/reload/watcher.go b/service/reload/watcher.go new file mode 100644 index 00000000..e81ce56f --- /dev/null +++ b/service/reload/watcher.go @@ -0,0 +1,235 @@ +package reload + +import ( + "errors" + "os" + "path/filepath" + "regexp" + "sync" + "time" +) + +// Config is a Reload configuration point. +//type Config struct { +// Enable or disable Reload extension, default disable. +//Enabled bool +// +// Watch is general pattern of files to watch. It will be applied to every directory in project +//Watch []string +// +// Services is set of services which would be reloaded in case of FS changes +//Services map[string]ServiceConfig +//} +// +//type ServiceConfig struct { +// Watch is per-service specific files to watch +//Watch []string +// Dirs is per-service specific dirs which will be combined with Watch +//Dirs []string +// Ignore is set of files which would not be watched +//Ignore []string +//} + +// An Op is a type that is used to describe what type +// of event has occurred during the watching process. +type Op uint32 + +// Ops +const ( + Create Op = iota + Write + Remove + Rename + Chmod + Move +) + +var ops = map[Op]string{ + Create: "CREATE", + Write: "WRITE", + Remove: "REMOVE", + Rename: "RENAME", + Chmod: "CHMOD", + Move: "MOVE", +} + +var ( + // ErrDurationTooShort occurs when calling the watcher's Start + // method with a duration that's less than 1 nanosecond. + ErrDurationTooShort = errors.New("error: duration is less than 1ns") + + // ErrWatcherRunning occurs when trying to call the watcher's + // Start method and the polling cycle is still already running + // from previously calling Start and not yet calling Close. + ErrWatcherRunning = errors.New("error: watcher is already running") + + // ErrWatchedFileDeleted is an error that occurs when a file or folder that was + // being watched has been deleted. + ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted") + + // ErrSkip is less of an error, but more of a way for path hooks to skip a file or + // directory. + ErrSkip = errors.New("error: skipping file") +) + +// FilterFileHookFunc is a function that is called to filter files during listings. +// If a file is ok to be listed, nil is returned otherwise ErrSkip is returned. +type FilterFileHookFunc func(info os.FileInfo, fullPath string) error + +// RegexFilterHook is a function that accepts or rejects a file +// for listing based on whether it's filename or full path matches +// a regular expression. +func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc { + return func(info os.FileInfo, fullPath string) error { + str := info.Name() + + if useFullPath { + str = fullPath + } + + // Match + if r.MatchString(str) { + return nil + } + + // No match. + return ErrSkip + } +} + +// An Event describes an event that is received when files or directory +// changes occur. It includes the os.FileInfo of the changed file or +// directory and the type of event that's occurred and the full path of the file. +type Event struct { + Op + Path string + OldPath string + os.FileInfo +} + +type Watcher struct { + Event chan Event + errors chan error + wg *sync.WaitGroup + + filterHooks []FilterFileHookFunc + + workingDir string + maxFileWatchEvents int + ops map[Op]struct{} // Op filtering. + files map[string]string //files by service, http, grpc, etc.. + ignored map[string]string //ignored files or directories +} + +// Options is used to set Watcher Options +type Options func(*Watcher) + +func NewWatcher(options ...Options) (*Watcher, error) { + dir, err := os.Getwd() + if err != nil { + return nil, err + } + + w := &Watcher{ + workingDir: dir, + } + + for _, option := range options { + option(w) + } + + // dir --> /home/valery/Projects/opensource/roadrunner + return w, nil +} + +// https://en.wikipedia.org/wiki/Inotify +// SetMaxFileEvents sets max file notify events for Watcher +// In case of file watch errors, this value can be increased system-wide +// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/.conf) +// Add apply: sudo sysctl -p --system +func SetMaxFileEvents(events int) Options { + return func(watcher *Watcher) { + watcher.maxFileWatchEvents = events + } + +} + +// Add +// name will be +func (w *Watcher) Add(name string) error { + name, err := filepath.Abs(name) + if err != nil { + + } + + // Ignored files + // map is to have O(1) when search for file + _, ignored := w.ignored[name] + if ignored { + return nil + } + + // small optimization for smallvector + fileList := make(map[string]os.FileInfo, 10) + err = w.addDirectoryContent(" ", fileList) + if err != nil { + return err + } + + +} + +func (w *Watcher) addDirectoryContent(name string, filelist map[string]os.FileInfo) error { + fileInfo, err := os.Stat(name) + if err != nil { + return err + } + + filelist[name] = fileInfo + + // if it's not a dir, return + if !fileInfo.IsDir() { + return nil + } + + + + +} + +func (w *Watcher) search(map[string]os.FileInfo) error { + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + -- cgit v1.2.3 From 6fb73b5fcca8d40f5fb3789875480f1e8151db6a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 17 Feb 2020 19:16:33 +0300 Subject: Remove unused errors Put reload service import in proper place --- service/reload/watcher.go | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) (limited to 'service') diff --git a/service/reload/watcher.go b/service/reload/watcher.go index e81ce56f..f1cafc8a 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -53,24 +53,7 @@ var ops = map[Op]string{ Move: "MOVE", } -var ( - // ErrDurationTooShort occurs when calling the watcher's Start - // method with a duration that's less than 1 nanosecond. - ErrDurationTooShort = errors.New("error: duration is less than 1ns") - - // ErrWatcherRunning occurs when trying to call the watcher's - // Start method and the polling cycle is still already running - // from previously calling Start and not yet calling Close. - ErrWatcherRunning = errors.New("error: watcher is already running") - - // ErrWatchedFileDeleted is an error that occurs when a file or folder that was - // being watched has been deleted. - ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted") - - // ErrSkip is less of an error, but more of a way for path hooks to skip a file or - // directory. - ErrSkip = errors.New("error: skipping file") -) +var ErrorSkip = errors.New("file is skipped") // FilterFileHookFunc is a function that is called to filter files during listings. // If a file is ok to be listed, nil is returned otherwise ErrSkip is returned. @@ -93,7 +76,7 @@ func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc { } // No match. - return ErrSkip + return ErrorSkip } } -- cgit v1.2.3 From 24ec01aa7f0225098e4c750a8c51843cc41bbf8b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 18 Feb 2020 16:29:56 +0300 Subject: Finish with Adding single path w/o recursion --- service/reload/service.go | 13 ++++-- service/reload/watcher.go | 104 +++++++++++++++++++++++++++------------------- 2 files changed, 72 insertions(+), 45 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index db10b6f4..ee2aed99 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -1,6 +1,9 @@ package reload -import "github.com/spiral/roadrunner/service" +import ( + "github.com/spiral/roadrunner/service" + "os" +) // ID contains default service name. const ID = "reload" @@ -25,12 +28,16 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } func (s *Service) Serve() error { - w, err := NewWatcher(s.reloadConfig, SetMaxFileEvents(100)) + w, err := NewWatcher(SetMaxFileEvents(100)) if err != nil { return err } - _ = w + name , _ := os.Getwd() + + w.AddSingle(name) + + println("test") return nil } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index f1cafc8a..ecc8c3ce 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -2,11 +2,12 @@ package reload import ( "errors" + "io/ioutil" "os" "path/filepath" "regexp" "sync" - "time" + "syscall" ) // Config is a Reload configuration point. @@ -80,6 +81,12 @@ func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc { } } +func SetFileHooks(fileHook ...FilterFileHookFunc) Options { + return func(watcher *Watcher) { + watcher.filterHooks = fileHook + } +} + // An Event describes an event that is received when files or directory // changes occur. It includes the os.FileInfo of the changed file or // directory and the type of event that's occurred and the full path of the file. @@ -99,9 +106,9 @@ type Watcher struct { workingDir string maxFileWatchEvents int - ops map[Op]struct{} // Op filtering. - files map[string]string //files by service, http, grpc, etc.. - ignored map[string]string //ignored files or directories + operations map[Op]struct{} // Op filtering. + files map[string]os.FileInfo //files by service, http, grpc, etc.. + ignored map[string]string //ignored files or directories } // Options is used to set Watcher Options @@ -115,6 +122,9 @@ func NewWatcher(options ...Options) (*Watcher, error) { w := &Watcher{ workingDir: dir, + operations: make(map[Op]struct{}), + files: make(map[string]os.FileInfo), + ignored: make(map[string]string), } for _, option := range options { @@ -137,9 +147,16 @@ func SetMaxFileEvents(events int) Options { } +// SetDefaultRootPath is used to set own root path for adding files +func SetDefaultRootPath(path string) Options { + return func(watcher *Watcher) { + watcher.workingDir = path + } +} + // Add -// name will be -func (w *Watcher) Add(name string) error { +// name will be current working dir +func (w *Watcher) AddSingle(name string) error { name, err := filepath.Abs(name) if err != nil { @@ -154,14 +171,20 @@ func (w *Watcher) Add(name string) error { // small optimization for smallvector fileList := make(map[string]os.FileInfo, 10) - err = w.addDirectoryContent(" ", fileList) + err = w.addDirectoryContent(name, fileList) if err != nil { return err } + for k, v := range fileList { + w.files[k] = v + } + + return nil } +// pass map from outside func (w *Watcher) addDirectoryContent(name string, filelist map[string]os.FileInfo) error { fileInfo, err := os.Stat(name) if err != nil { @@ -175,44 +198,41 @@ func (w *Watcher) addDirectoryContent(name string, filelist map[string]os.FileIn return nil } + fileInfoList, err := ioutil.ReadDir(name) + if err != nil { + return err + } + // recursive calls are slow in compare to goto + // so, we will add files with goto pattern + +outer: + for i := 0; i < len(fileInfoList); i++ { + var path string + // BCE check elimination + // https://go101.org/article/bounds-check-elimination.html + if len(fileInfoList) != 0 && len(fileInfoList) >= i { + path = filepath.Join(name, fileInfoList[i].Name()) + } else { + return errors.New("file info list len") + } + // if file in ignored --> continue + if _, ignored := w.ignored[name]; ignored { + continue + } -} - -func (w *Watcher) search(map[string]os.FileInfo) error { - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + for _, fh := range w.filterHooks { + err := fh(fileInfo, path) + if err != nil { + // if err is not nil, move to the start of the cycle since the path not match the hook + continue outer + } + } + filelist[path] = fileInfo + } + return nil +} -- cgit v1.2.3 From 9cff24e4d515d684cefdf46624da90d22224aeaa Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 19 Feb 2020 17:30:05 +0300 Subject: First concept of reload --- service/reload/samefile.go | 9 + service/reload/samefile_windows.go | 12 ++ service/reload/service.go | 58 +++++- service/reload/watcher.go | 399 +++++++++++++++++++++++++++++++++++-- 4 files changed, 450 insertions(+), 28 deletions(-) create mode 100644 service/reload/samefile.go create mode 100644 service/reload/samefile_windows.go (limited to 'service') diff --git a/service/reload/samefile.go b/service/reload/samefile.go new file mode 100644 index 00000000..80df0431 --- /dev/null +++ b/service/reload/samefile.go @@ -0,0 +1,9 @@ +// +build !windows + +package reload + +import "os" + +func sameFile(fi1, fi2 os.FileInfo) bool { + return os.SameFile(fi1, fi2) +} diff --git a/service/reload/samefile_windows.go b/service/reload/samefile_windows.go new file mode 100644 index 00000000..5f70d327 --- /dev/null +++ b/service/reload/samefile_windows.go @@ -0,0 +1,12 @@ +// +build windows + +package reload + +import "os" + +func sameFile(fi1, fi2 os.FileInfo) bool { + return fi1.ModTime() == fi2.ModTime() && + fi1.Size() == fi2.Size() && + fi1.Mode() == fi2.Mode() && + fi1.IsDir() == fi2.IsDir() +} diff --git a/service/reload/service.go b/service/reload/service.go index ee2aed99..78f147b7 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -3,6 +3,7 @@ package reload import ( "github.com/spiral/roadrunner/service" "os" + "time" ) // ID contains default service name. @@ -10,18 +11,12 @@ const ID = "reload" type Service struct { reloadConfig *Config + container service.Container } // Init controller service func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { - // mount Services to designated services - //for id, watcher := range cfg.Controllers(s.throw) { - // svc, _ := c.Get(id) - // if ctrl, ok := svc.(controllable); ok { - // ctrl.Attach(watcher) - // } - //} - + s.container = c s.reloadConfig = cfg return true, nil @@ -33,11 +28,52 @@ func (s *Service) Serve() error { return err } - name , _ := os.Getwd() + name , err := os.Getwd() + if err != nil { + return err + } + + err = w.AddSingle(name) + if err != nil { + return err + } - w.AddSingle(name) + go func() { + err = w.StartPolling(time.Second) + if err != nil { + + } + }() + + + + // read events and restart corresponding services + + + for { + select { + case e := <- w.Event: + println(e.Name()) + } + } + //for e = range w.Event { + // + // println("event") + // // todo use status + // //svc, _ := s.container.Get("http") + // //if svc != nil { + // // if srv, ok := svc.(service.Service); ok { + // // srv.Stop() + // // err = srv.Serve() + // // if err != nil { + // // return err + // // } + // // } + // //} + // + // //println("event skipped due to service is nil") + //} - println("test") return nil } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index ecc8c3ce..8d344b4c 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -6,8 +6,9 @@ import ( "os" "path/filepath" "regexp" + "strings" "sync" - "syscall" + "time" ) // Config is a Reload configuration point. @@ -92,18 +93,27 @@ func SetFileHooks(fileHook ...FilterFileHookFunc) Options { // directory and the type of event that's occurred and the full path of the file. type Event struct { Op + Path string OldPath string os.FileInfo + + Type string // type of event, http, grpc, etc... } type Watcher struct { Event chan Event errors chan error - wg *sync.WaitGroup + close chan struct{} + Closed chan struct{} + + mu *sync.Mutex + //wg *sync.WaitGroup filterHooks []FilterFileHookFunc + started bool // indicates is walker started or not + workingDir string maxFileWatchEvents int operations map[Op]struct{} // Op filtering. @@ -121,6 +131,11 @@ func NewWatcher(options ...Options) (*Watcher, error) { } w := &Watcher{ + Event: make(chan Event), + mu: &sync.Mutex{}, + //wg: &sync.WaitGroup{}, + Closed: make(chan struct{}), + close: make(chan struct{}), workingDir: dir, operations: make(map[Op]struct{}), files: make(map[string]os.FileInfo), @@ -170,8 +185,8 @@ func (w *Watcher) AddSingle(name string) error { } // small optimization for smallvector - fileList := make(map[string]os.FileInfo, 10) - err = w.addDirectoryContent(name, fileList) + //fileList := make(map[string]os.FileInfo, 10) + fileList, err := w.retrieveSingleDirectoryContent(name) if err != nil { return err } @@ -184,23 +199,59 @@ func (w *Watcher) AddSingle(name string) error { } -// pass map from outside -func (w *Watcher) addDirectoryContent(name string, filelist map[string]os.FileInfo) error { - fileInfo, err := os.Stat(name) +func (w *Watcher) AddRecursive(name string) error { + name, err := filepath.Abs(name) if err != nil { return err } - filelist[name] = fileInfo + filesList := make(map[string]os.FileInfo, 10) + + err = w.retrieveFilesRecursive(name, filesList) + if err != nil { + return err + } + + for k, v := range filesList { + w.files[k] = v + } + + return nil +} + +// pass map from outside +func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.FileInfo, error) { + stat, err := os.Stat(path) + if err != nil { + return nil, err + } + + filesList := make(map[string]os.FileInfo, 10) + + filesList[path] = stat // if it's not a dir, return - if !fileInfo.IsDir() { - return nil + if !stat.IsDir() { + return filesList, nil } - fileInfoList, err := ioutil.ReadDir(name) + //err = filepath.Walk(name, func(path string, info os.FileInfo, err error) error { + // if info.IsDir() { + // return nil + // } + // + // fileList[path] = info + // + // return nil + //}) + // + //if err != nil { + // return err + //} + + fileInfoList, err := ioutil.ReadDir(path) if err != nil { - return err + return nil, err } // recursive calls are slow in compare to goto @@ -212,27 +263,341 @@ outer: // BCE check elimination // https://go101.org/article/bounds-check-elimination.html if len(fileInfoList) != 0 && len(fileInfoList) >= i { - path = filepath.Join(name, fileInfoList[i].Name()) + path = filepath.Join(path, fileInfoList[i].Name()) } else { - return errors.New("file info list len") + return nil, errors.New("file info list len") } // if file in ignored --> continue - if _, ignored := w.ignored[name]; ignored { + if _, ignored := w.ignored[path]; ignored { continue } for _, fh := range w.filterHooks { - err := fh(fileInfo, path) + err := fh(fileInfoList[i], path) if err != nil { // if err is not nil, move to the start of the cycle since the path not match the hook continue outer } } - filelist[path] = fileInfo + filesList[path] = fileInfoList[i] + + } + + return filesList, nil +} + +func (w *Watcher) StartPolling(duration time.Duration) error { + if duration < time.Second { + return errors.New("too short duration, please use at least 1 second") + } + + w.mu.Lock() + if w.started { + w.mu.Unlock() + return errors.New("already started") + } + + w.started = true + w.mu.Unlock() + + //w.wg.Done() + + return w.waitEvent(duration) +} + +// this is blocking operation +func (w *Watcher) waitEvent(d time.Duration) error { + for { + // done lets the inner polling cycle loop know when the + // current cycle's method has finished executing. + //done := make(chan struct{}) + + // Any events that are found are first piped to evt before + // being sent to the main Event channel. + //evt := make(chan Event) + + // Retrieve the file list for all watched file's and dirs. + //fileList := w.files + + // cancel can be used to cancel the current event polling function. + cancel := make(chan struct{}) + + // Look for events. + //go func() { + // w.pollEvents(w.files, evt, cancel) + // done <- struct{}{} + //}() + + // numEvents holds the number of events for the current cycle. + //numEvents := 0 + + ticker := time.NewTicker(d) + for { + select { + case <-w.close: + close(cancel) + close(w.Closed) + return nil + case <-ticker.C: + //fileList := make(map[string]os.FileInfo, 100) + //w.mu.Lock() + fileList, _ := w.retrieveFileList(w.workingDir, false) + w.pollEvents(fileList, cancel) + //w.mu.Unlock() + default: + + } + } + + ticker.Stop() + //inner: + // for { + // select { + // case <-w.close: + // close(cancel) + // close(w.Closed) + // return nil + // case event := <-evt: + // //if len(w.operations) > 0 { // Filter Ops. + // // _, found := w.operations[event.Op] + // // if !found { + // // continue + // // } + // //} + // //numEvents++ + // //if w.maxFileWatchEvents > 0 && numEvents > w.maxFileWatchEvents { + // // close(cancel) + // // break inner + // //} + // w.Event <- event + // case <-done: // Current cycle is finished. + // break inner + // } + // } + + //// Update the file's list. + //w.mu.Lock() + //w.files = fileList + //w.mu.Unlock() + + //time.Sleep(d) + //sleepLoop: + // for { + // select { + // case <-w.close: + // close(cancel) + // return nil + // case <-time.After(d): + // break sleepLoop + // } + // } //end Sleep for + } +} + +func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.FileInfo, error) { + + //fileList := make(map[string]os.FileInfo) + + //list := make(map[string]os.FileInfo, 100) + //var err error + + if recursive { + //fileList, err := w.retrieveFilesRecursive(path) + //if err != nil { + //if os.IsNotExist(err) { + // w.mu.Unlock() + // // todo path error + // _, ok := err.(*os.PathError) + // if ok { + // w.RemoveRecursive(path) + // } + // w.mu.Lock() + //} else { + // w.errors <- err + //} + //} + + //for k, v := range fileList { + // fileList[k] = v + //} + //return fileList, nil + return nil, nil + } else { + fileList, err := w.retrieveSingleDirectoryContent(path) + if err != nil { + //if os.IsNotExist(err) { + // w.mu.Unlock() + // _, ok := err.(*os.PathError) + // if ok { + // w.RemoveRecursive(path) + // } + // w.mu.Lock() + //} else { + // w.errors <- err + //} + } + + for k, v := range fileList { + fileList[k] = v + } + return fileList, nil + } + // Add the file's to the file list. + + //return nil +} + +// RemoveRecursive removes either a single file or a directory recursively from +// the file's list. +func (w *Watcher) RemoveRecursive(name string) (err error) { + w.mu.Lock() + defer w.mu.Unlock() + name, err = filepath.Abs(name) + if err != nil { + return err } + // If name is a single file, remove it and return. + info, found := w.files[name] + if !found { + return nil // Doesn't exist, just return. + } + if !info.IsDir() { + delete(w.files, name) + return nil + } + + // If it's a directory, delete all of it's contents recursively + // from w.files. + for path := range w.files { + if strings.HasPrefix(path, name) { + delete(w.files, path) + } + } return nil } + +func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.FileInfo) error { + return filepath.Walk(name, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + for _, f := range w.filterHooks { + err := f(info, path) + if err == ErrorSkip { + return nil + } + if err != nil { + return err + } + } + + // If path is ignored and it's a directory, skip the directory. If it's + // ignored and it's a single file, skip the file. + _, ignored := w.ignored[path] + + if ignored { + if info.IsDir() { + return filepath.SkipDir + } + return nil + } + // Add the path and it's info to the file list. + fileList[path] = info + return nil + }) +} + +// Wait blocks until the watcher is started. +//func (w *Watcher) Wait() { +// w.wg.Wait() +//} + +func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) { + w.mu.Lock() + defer w.mu.Unlock() + + // Store create and remove events for use to check for rename events. + creates := make(map[string]os.FileInfo) + removes := make(map[string]os.FileInfo) + + // Check for removed files. + for path, info := range w.files { + if _, found := files[path]; !found { + removes[path] = info + } + } + + // Check for created files, writes and chmods. + for path, info := range files { + oldInfo, found := w.files[path] + if !found { + // A file was created. + creates[path] = info + continue + } + if oldInfo.ModTime() != info.ModTime() { + w.files[path] = info + select { + case <-cancel: + return + case w.Event <- Event{Write, path, path, info, "http"}: + } + } + if oldInfo.Mode() != info.Mode() { + select { + case <-cancel: + return + case w.Event <- Event{Chmod, path, path, info, "http"}: + } + } + } + + // Check for renames and moves. + for path1, info1 := range removes { + for path2, info2 := range creates { + if sameFile(info1, info2) { + e := Event{ + Op: Move, + Path: path2, + OldPath: path1, + FileInfo: info1, + } + // If they are from the same directory, it's a rename + // instead of a move event. + if filepath.Dir(path1) == filepath.Dir(path2) { + e.Op = Rename + } + + delete(removes, path1) + delete(creates, path2) + + select { + case <-cancel: + return + case w.Event <- e: + } + } + } + } + + // Send all the remaining create and remove events. + for path, info := range creates { + select { + case <-cancel: + return + case w.Event <- Event{Create, path, "", info, "http"}: + } + } + for path, info := range removes { + select { + case <-cancel: + return + case w.Event <- Event{Remove, path, path, info, "http"}: + } + } +} -- cgit v1.2.3 From 54fb0dc2baa8d874f0ff090b49c80361bb6a9557 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 20 Feb 2020 01:02:12 +0300 Subject: Update wathcer implementation. Need to rethink structure and optimize algorithms --- service/reload/config.go | 8 +- service/reload/service.go | 91 +++++---- service/reload/watcher.go | 458 +++++++++++++++++++++------------------------- 3 files changed, 263 insertions(+), 294 deletions(-) (limited to 'service') diff --git a/service/reload/config.go b/service/reload/config.go index 338c6eba..fb704015 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -15,9 +15,11 @@ type Config struct { } type ServiceConfig struct { - // Watch is per-service specific files to watch - Watch []string - // Dirs is per-service specific dirs which will be combined with Watch + // Recursive is options to use nested files from root folder + Recursive bool + // Patterns is per-service specific files to watch + Patterns []string + // Dirs is per-service specific dirs which will be combined with Patterns Dirs []string // Ignore is set of files which would not be watched Ignore []string diff --git a/service/reload/service.go b/service/reload/service.go index 78f147b7..f50d6626 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -3,6 +3,7 @@ package reload import ( "github.com/spiral/roadrunner/service" "os" + "strings" "time" ) @@ -11,7 +12,8 @@ const ID = "reload" type Service struct { reloadConfig *Config - container service.Container + container service.Container + watcher *Watcher } // Init controller service @@ -23,61 +25,70 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } func (s *Service) Serve() error { - w, err := NewWatcher(SetMaxFileEvents(100)) - if err != nil { - return err - } - - name , err := os.Getwd() - if err != nil { - return err + if !s.reloadConfig.Enabled { + return nil } - err = w.AddSingle(name) + var err error + s.watcher, err = NewWatcher([]WatcherConfig{WatcherConfig{ + serviceName: "test", + recursive: false, + directories: []string{"/service"}, + filterHooks: func(filename, pattern string) error { + if strings.Contains(filename, pattern) { + return ErrorSkip + } + return nil + }, + files: make(map[string]os.FileInfo), + //ignored: []string{".php"}, + }}) if err != nil { return err } - go func() { - err = w.StartPolling(time.Second) - if err != nil { - } - }() + s.watcher.AddSingle("test", "/service") - // read events and restart corresponding services - - - for { - select { - case e := <- w.Event: - println(e.Name()) + go func() { + for { + select { + case e := <-s.watcher.Event: + println(e.Name()) + } } + //for e = range w.Event { + // + // println("event") + // // todo use status + // //svc, _ := s.container.Get("http") + // //if svc != nil { + // // if srv, ok := svc.(service.Service); ok { + // // srv.Stop() + // // err = srv.Serve() + // // if err != nil { + // // return err + // // } + // // } + // //} + // + // //println("event skipped due to service is nil") + //} + }() + + err = s.watcher.StartPolling(time.Second) + if err != nil { + return err } - //for e = range w.Event { - // - // println("event") - // // todo use status - // //svc, _ := s.container.Get("http") - // //if svc != nil { - // // if srv, ok := svc.(service.Service); ok { - // // srv.Stop() - // // err = srv.Serve() - // // if err != nil { - // // return err - // // } - // // } - // //} - // - // //println("event skipped due to service is nil") - //} + // read events and restart corresponding services return nil } func (s *Service) Stop() { + //s.watcher.Stop() -} \ No newline at end of file +} diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 8d344b4c..5e57af32 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -4,34 +4,12 @@ import ( "errors" "io/ioutil" "os" + "path" "path/filepath" - "regexp" - "strings" "sync" "time" ) -// Config is a Reload configuration point. -//type Config struct { -// Enable or disable Reload extension, default disable. -//Enabled bool -// -// Watch is general pattern of files to watch. It will be applied to every directory in project -//Watch []string -// -// Services is set of services which would be reloaded in case of FS changes -//Services map[string]ServiceConfig -//} -// -//type ServiceConfig struct { -// Watch is per-service specific files to watch -//Watch []string -// Dirs is per-service specific dirs which will be combined with Watch -//Dirs []string -// Ignore is set of files which would not be watched -//Ignore []string -//} - // An Op is a type that is used to describe what type // of event has occurred during the watching process. type Op uint32 @@ -56,37 +34,10 @@ var ops = map[Op]string{ } var ErrorSkip = errors.New("file is skipped") +var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true") -// FilterFileHookFunc is a function that is called to filter files during listings. -// If a file is ok to be listed, nil is returned otherwise ErrSkip is returned. -type FilterFileHookFunc func(info os.FileInfo, fullPath string) error - -// RegexFilterHook is a function that accepts or rejects a file -// for listing based on whether it's filename or full path matches -// a regular expression. -func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc { - return func(info os.FileInfo, fullPath string) error { - str := info.Name() - - if useFullPath { - str = fullPath - } - - // Match - if r.MatchString(str) { - return nil - } - - // No match. - return ErrorSkip - } -} - -func SetFileHooks(fileHook ...FilterFileHookFunc) Options { - return func(watcher *Watcher) { - watcher.filterHooks = fileHook - } -} +// SimpleHook is used to filter by simple criteria, CONTAINS +type SimpleHook func(filename, pattern string) error // An Event describes an event that is received when files or directory // changes occur. It includes the os.FileInfo of the changed file or @@ -101,30 +52,48 @@ type Event struct { Type string // type of event, http, grpc, etc... } +type WatcherConfig struct { + // service name + serviceName string + // recursive or just add by singe directory + recursive bool + // directories used per-service + directories []string + // simple hook, just CONTAINS + filterHooks SimpleHook + // path to file with files + files map[string]os.FileInfo + // //ignored files or directories, used map for O(1) amortized get + ignored map[string]string +} + type Watcher struct { - Event chan Event + // main event channel + Event chan Event + errors chan error close chan struct{} Closed chan struct{} - + //============================= mu *sync.Mutex - //wg *sync.WaitGroup + wg *sync.WaitGroup - filterHooks []FilterFileHookFunc + // indicates is walker started or not + started bool + // working directory, same for all + workingDir string - started bool // indicates is walker started or not + // operation type + operations map[Op]struct{} // Op filtering. - workingDir string - maxFileWatchEvents int - operations map[Op]struct{} // Op filtering. - files map[string]os.FileInfo //files by service, http, grpc, etc.. - ignored map[string]string //ignored files or directories + // config for each service + watcherConfigs map[string]WatcherConfig } // Options is used to set Watcher Options type Options func(*Watcher) -func NewWatcher(options ...Options) (*Watcher, error) { +func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { dir, err := os.Getwd() if err != nil { return nil, err @@ -133,34 +102,47 @@ func NewWatcher(options ...Options) (*Watcher, error) { w := &Watcher{ Event: make(chan Event), mu: &sync.Mutex{}, - //wg: &sync.WaitGroup{}, - Closed: make(chan struct{}), - close: make(chan struct{}), - workingDir: dir, - operations: make(map[Op]struct{}), - files: make(map[string]os.FileInfo), - ignored: make(map[string]string), + wg: &sync.WaitGroup{}, + + Closed: make(chan struct{}), + close: make(chan struct{}), + + workingDir: dir, + operations: make(map[Op]struct{}), + watcherConfigs: make(map[string]WatcherConfig), + } + + // add watcherConfigs by service names + for _, v := range configs { + w.watcherConfigs[v.serviceName] = v } for _, option := range options { option(w) } - // dir --> /home/valery/Projects/opensource/roadrunner + if w.watcherConfigs == nil { + return nil, NoWalkerConfig + } + return w, nil } +func (w *Watcher) AddWatcherConfig(config WatcherConfig) { + w.watcherConfigs[config.serviceName] = config +} + // https://en.wikipedia.org/wiki/Inotify // SetMaxFileEvents sets max file notify events for Watcher // In case of file watch errors, this value can be increased system-wide // For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/.conf) // Add apply: sudo sysctl -p --system -func SetMaxFileEvents(events int) Options { - return func(watcher *Watcher) { - watcher.maxFileWatchEvents = events - } - -} +//func SetMaxFileEvents(events int) Options { +// return func(watcher *Watcher) { +// watcher.maxFileWatchEvents = events +// } +// +//} // SetDefaultRootPath is used to set own root path for adding files func SetDefaultRootPath(path string) Options { @@ -171,63 +153,81 @@ func SetDefaultRootPath(path string) Options { // Add // name will be current working dir -func (w *Watcher) AddSingle(name string) error { - name, err := filepath.Abs(name) +func (w *Watcher) AddSingle(serviceName, relPath string) error { + absPath, err := filepath.Abs(w.workingDir) if err != nil { - + return err } + // full path is workdir/relative_path + fullPath := path.Join(absPath, relPath) + // Ignored files // map is to have O(1) when search for file - _, ignored := w.ignored[name] + _, ignored := w.watcherConfigs[serviceName].ignored[fullPath] if ignored { return nil } // small optimization for smallvector //fileList := make(map[string]os.FileInfo, 10) - fileList, err := w.retrieveSingleDirectoryContent(name) + fileList, err := w.retrieveFilesSingle(serviceName, fullPath) if err != nil { return err } - for k, v := range fileList { - w.files[k] = v + for fullPth, file := range fileList { + w.watcherConfigs[serviceName].files[fullPth] = file } return nil - } -func (w *Watcher) AddRecursive(name string) error { - name, err := filepath.Abs(name) +func (w *Watcher) AddRecursive(serviceName string, relPath string) error { + workDirAbs, err := filepath.Abs(w.workingDir) if err != nil { return err } - filesList := make(map[string]os.FileInfo, 10) + fullPath := path.Join(workDirAbs, relPath) + filesList := make(map[string]os.FileInfo, 100) - err = w.retrieveFilesRecursive(name, filesList) + err = w.retrieveFilesRecursive(serviceName, fullPath, filesList) if err != nil { return err } - for k, v := range filesList { - w.files[k] = v + for pathToFile, file := range filesList { + w.watcherConfigs[serviceName].files[pathToFile] = file + } + + return nil +} + +func (w *Watcher) AddIgnored(serviceName string, directories []string) error { + workDirAbs, err := filepath.Abs(w.workingDir) + if err != nil { + return err + } + + // concat wd with relative paths from config + // todo check path for existance + for _, v := range directories { + fullPath := path.Join(workDirAbs, v) + w.watcherConfigs[serviceName].ignored[fullPath] = fullPath } return nil } // pass map from outside -func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.FileInfo, error) { +func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { stat, err := os.Stat(path) if err != nil { return nil, err } filesList := make(map[string]os.FileInfo, 10) - filesList[path] = stat // if it's not a dir, return @@ -235,20 +235,6 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil return filesList, nil } - //err = filepath.Walk(name, func(path string, info os.FileInfo, err error) error { - // if info.IsDir() { - // return nil - // } - // - // fileList[path] = info - // - // return nil - //}) - // - //if err != nil { - // return err - //} - fileInfoList, err := ioutil.ReadDir(path) if err != nil { return nil, err @@ -259,29 +245,27 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil outer: for i := 0; i < len(fileInfoList); i++ { - var path string + var pathToFile string // BCE check elimination // https://go101.org/article/bounds-check-elimination.html if len(fileInfoList) != 0 && len(fileInfoList) >= i { - path = filepath.Join(path, fileInfoList[i].Name()) + pathToFile = filepath.Join(pathToFile, fileInfoList[i].Name()) } else { return nil, errors.New("file info list len") } // if file in ignored --> continue - if _, ignored := w.ignored[path]; ignored { + if _, ignored := w.watcherConfigs[serviceName].ignored[path]; ignored { continue } - for _, fh := range w.filterHooks { - err := fh(fileInfoList[i], path) - if err != nil { - // if err is not nil, move to the start of the cycle since the path not match the hook - continue outer - } + err := w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), pathToFile) + if err != nil { + // if err is not nil, move to the start of the cycle since the pathToFile not match the hook + continue outer } - filesList[path] = fileInfoList[i] + filesList[pathToFile] = fileInfoList[i] } @@ -307,6 +291,22 @@ func (w *Watcher) StartPolling(duration time.Duration) error { return w.waitEvent(duration) } +//func (w *Watcher) updatedFileListForConfig(config WatcherConfig) (map[string]os.FileInfo, error) { +// if config.recursive { +// return nil, nil +// } +// +// for _, v := range config.directories { +// files, err := w.retrieveFilesSingle(path.Join(w.workingDir, v)) +// if err != nil { +// return nil, err +// } +// +// } +// +// return nil, nil +//} + // this is blocking operation func (w *Watcher) waitEvent(d time.Duration) error { for { @@ -343,100 +343,40 @@ func (w *Watcher) waitEvent(d time.Duration) error { case <-ticker.C: //fileList := make(map[string]os.FileInfo, 100) //w.mu.Lock() - fileList, _ := w.retrieveFileList(w.workingDir, false) - w.pollEvents(fileList, cancel) + + for serviceName, config := range w.watcherConfigs { + fileList, _ := w.retrieveFileList(serviceName, config) + w.pollEvents(config.serviceName, fileList, cancel) + } + //w.mu.Unlock() default: } } - - ticker.Stop() - //inner: - // for { - // select { - // case <-w.close: - // close(cancel) - // close(w.Closed) - // return nil - // case event := <-evt: - // //if len(w.operations) > 0 { // Filter Ops. - // // _, found := w.operations[event.Op] - // // if !found { - // // continue - // // } - // //} - // //numEvents++ - // //if w.maxFileWatchEvents > 0 && numEvents > w.maxFileWatchEvents { - // // close(cancel) - // // break inner - // //} - // w.Event <- event - // case <-done: // Current cycle is finished. - // break inner - // } - // } - - //// Update the file's list. - //w.mu.Lock() - //w.files = fileList - //w.mu.Unlock() - - //time.Sleep(d) - //sleepLoop: - // for { - // select { - // case <-w.close: - // close(cancel) - // return nil - // case <-time.After(d): - // break sleepLoop - // } - // } //end Sleep for } } -func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.FileInfo, error) { - - //fileList := make(map[string]os.FileInfo) - - //list := make(map[string]os.FileInfo, 100) - //var err error - - if recursive { - //fileList, err := w.retrieveFilesRecursive(path) - //if err != nil { - //if os.IsNotExist(err) { - // w.mu.Unlock() - // // todo path error - // _, ok := err.(*os.PathError) - // if ok { - // w.RemoveRecursive(path) - // } - // w.mu.Lock() - //} else { - // w.errors <- err - //} - //} - - //for k, v := range fileList { - // fileList[k] = v - //} - //return fileList, nil - return nil, nil - } else { - fileList, err := w.retrieveSingleDirectoryContent(path) +func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + if config.recursive { + fileList := make(map[string]os.FileInfo) + err := w.retrieveFilesRecursive(serviceName, w.workingDir, fileList) if err != nil { - //if os.IsNotExist(err) { - // w.mu.Unlock() - // _, ok := err.(*os.PathError) - // if ok { - // w.RemoveRecursive(path) - // } - // w.mu.Lock() - //} else { - // w.errors <- err - //} + if os.IsNotExist(err) { + w.mu.Unlock() + // todo path error + _, ok := err.(*os.PathError) + if ok { + // todo + //err = w.RemoveRecursive(path) + if err != nil { + return nil, err + } + } + w.mu.Lock() + } else { + w.errors <- err + } } for k, v := range fileList { @@ -444,61 +384,81 @@ func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.F } return fileList, nil } - // Add the file's to the file list. - //return nil -} - -// RemoveRecursive removes either a single file or a directory recursively from -// the file's list. -func (w *Watcher) RemoveRecursive(name string) (err error) { - w.mu.Lock() - defer w.mu.Unlock() + fileList := make(map[string]os.FileInfo) + for _, dir := range config.directories { + absPath, err := filepath.Abs(w.workingDir) + if err != nil { + return nil, err + } - name, err = filepath.Abs(name) - if err != nil { - return err - } + // full path is workdir/relative_path + fullPath := path.Join(absPath, dir) - // If name is a single file, remove it and return. - info, found := w.files[name] - if !found { - return nil // Doesn't exist, just return. - } - if !info.IsDir() { - delete(w.files, name) - return nil - } + // list is pathToFiles with files + list, err := w.retrieveFilesSingle(serviceName, fullPath) - // If it's a directory, delete all of it's contents recursively - // from w.files. - for path := range w.files { - if strings.HasPrefix(path, name) { - delete(w.files, path) + for pathToFile, file := range list { + fileList[pathToFile] = file } } - return nil + + return fileList, nil + + // Add the file's to the file list. + + //return nil } -func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.FileInfo) error { - return filepath.Walk(name, func(path string, info os.FileInfo, err error) error { +// RemoveRecursive removes either a single file or a directory recursively from +// the file's list. +//func (w *Watcher) RemoveRecursive(name string) (err error) { +// w.mu.Lock() +// defer w.mu.Unlock() +// +// name, err = filepath.Abs(name) +// if err != nil { +// return err +// } +// +// // If name is a single file, remove it and return. +// info, found := w.files[name] +// if !found { +// return nil // Doesn't exist, just return. +// } +// if !info.IsDir() { +// delete(w.files, name) +// return nil +// } +// +// // If it's a directory, delete all of it's contents recursively +// // from w.files. +// for path := range w.files { +// if strings.HasPrefix(path, name) { +// delete(w.files, path) +// } +// } +// return nil +//} + +func (w *Watcher) retrieveFilesRecursive(serviceName, root string, fileList map[string]os.FileInfo) error { + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } - for _, f := range w.filterHooks { - err := f(info, path) - if err == ErrorSkip { - return nil - } - if err != nil { - return err - } + // filename, pattern + err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path) + if err == ErrorSkip { + return nil + } + if err != nil { + return err } // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. - _, ignored := w.ignored[path] + _, ignored := w.watcherConfigs[serviceName].ignored[path] if ignored { if info.IsDir() { @@ -512,12 +472,7 @@ func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.Fil }) } -// Wait blocks until the watcher is started. -//func (w *Watcher) Wait() { -// w.wg.Wait() -//} - -func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) { +func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, cancel chan struct{}) { w.mu.Lock() defer w.mu.Unlock() @@ -526,7 +481,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) removes := make(map[string]os.FileInfo) // Check for removed files. - for path, info := range w.files { + for path, info := range w.watcherConfigs[serviceName].files { if _, found := files[path]; !found { removes[path] = info } @@ -534,14 +489,14 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) // Check for created files, writes and chmods. for path, info := range files { - oldInfo, found := w.files[path] + oldInfo, found := w.watcherConfigs[serviceName].files[path] if !found { // A file was created. creates[path] = info continue } if oldInfo.ModTime() != info.ModTime() { - w.files[path] = info + w.watcherConfigs[serviceName].files[path] = info select { case <-cancel: return @@ -549,6 +504,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) } } if oldInfo.Mode() != info.Mode() { + w.watcherConfigs[serviceName].files[path] = info select { case <-cancel: return -- cgit v1.2.3 From ec7975355a8acea632e5c9b7e912b3e9ad6907ca Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 20 Feb 2020 08:56:50 +0300 Subject: Some fixes --- service/reload/service.go | 6 +-- service/reload/watcher.go | 111 ++++++++++++++++++---------------------------- 2 files changed, 47 insertions(+), 70 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index f50d6626..5a400159 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -31,7 +31,7 @@ func (s *Service) Serve() error { var err error s.watcher, err = NewWatcher([]WatcherConfig{WatcherConfig{ - serviceName: "test", + serviceName: "test_service_name", recursive: false, directories: []string{"/service"}, filterHooks: func(filename, pattern string) error { @@ -49,14 +49,14 @@ func (s *Service) Serve() error { - s.watcher.AddSingle("test", "/service") + s.watcher.AddSingle("test_service_name", "/service") go func() { for { select { case e := <-s.watcher.Event: - println(e.Name()) + println(e.Type) } } //for e = range w.Event { diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 5e57af32..7b224319 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -190,9 +190,8 @@ func (w *Watcher) AddRecursive(serviceName string, relPath string) error { } fullPath := path.Join(workDirAbs, relPath) - filesList := make(map[string]os.FileInfo, 100) - err = w.retrieveFilesRecursive(serviceName, fullPath, filesList) + filesList, err := w.retrieveFilesRecursive(serviceName, fullPath) if err != nil { return err } @@ -310,29 +309,8 @@ func (w *Watcher) StartPolling(duration time.Duration) error { // this is blocking operation func (w *Watcher) waitEvent(d time.Duration) error { for { - // done lets the inner polling cycle loop know when the - // current cycle's method has finished executing. - //done := make(chan struct{}) - - // Any events that are found are first piped to evt before - // being sent to the main Event channel. - //evt := make(chan Event) - - // Retrieve the file list for all watched file's and dirs. - //fileList := w.files - - // cancel can be used to cancel the current event polling function. cancel := make(chan struct{}) - // Look for events. - //go func() { - // w.pollEvents(w.files, evt, cancel) - // done <- struct{}{} - //}() - - // numEvents holds the number of events for the current cycle. - //numEvents := 0 - ticker := time.NewTicker(d) for { select { @@ -345,10 +323,16 @@ func (w *Watcher) waitEvent(d time.Duration) error { //w.mu.Lock() for serviceName, config := range w.watcherConfigs { - fileList, _ := w.retrieveFileList(serviceName, config) - w.pollEvents(config.serviceName, fileList, cancel) + go func(sn string, c WatcherConfig) { + w.wg.Add(1) + fileList, _ := w.retrieveFileList(sn, c) + w.pollEvents(c.serviceName, fileList, cancel) + w.wg.Done() + }(serviceName, config) } + w.wg.Wait() + //w.mu.Unlock() default: @@ -358,34 +342,25 @@ func (w *Watcher) waitEvent(d time.Duration) error { } func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + fileList := make(map[string]os.FileInfo) if config.recursive { - fileList := make(map[string]os.FileInfo) - err := w.retrieveFilesRecursive(serviceName, w.workingDir, fileList) - if err != nil { - if os.IsNotExist(err) { - w.mu.Unlock() - // todo path error - _, ok := err.(*os.PathError) - if ok { - // todo - //err = w.RemoveRecursive(path) - if err != nil { - return nil, err - } - } - w.mu.Lock() - } else { - w.errors <- err + // walk through directories recursively + for _, dir := range config.directories { + // full path is workdir/relative_path + fullPath := path.Join(w.workingDir, dir) + list, err := w.retrieveFilesRecursive(serviceName, fullPath) + if err != nil { + return nil, err } - } - for k, v := range fileList { - fileList[k] = v + for k, v := range list { + fileList[k] = v + } + return fileList, nil } - return fileList, nil } - fileList := make(map[string]os.FileInfo) + for _, dir := range config.directories { absPath, err := filepath.Abs(w.workingDir) if err != nil { @@ -441,21 +416,23 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma // return nil //} -func (w *Watcher) retrieveFilesRecursive(serviceName, root string, fileList map[string]os.FileInfo) error { - return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } +func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) { + fileList := make(map[string]os.FileInfo) - // filename, pattern - err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path) - if err == ErrorSkip { - return nil - } + return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } + // filename, pattern TODO + //err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path) + //if err == ErrorSkip { + // return nil + //} + //if err != nil { + // return err + //} + // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. _, ignored := w.watcherConfigs[serviceName].ignored[path] @@ -481,34 +458,34 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, c removes := make(map[string]os.FileInfo) // Check for removed files. - for path, info := range w.watcherConfigs[serviceName].files { - if _, found := files[path]; !found { - removes[path] = info + for pth, info := range w.watcherConfigs[serviceName].files { + if _, found := files[pth]; !found { + removes[pth] = info } } // Check for created files, writes and chmods. - for path, info := range files { - oldInfo, found := w.watcherConfigs[serviceName].files[path] + for pth, info := range files { + oldInfo, found := w.watcherConfigs[serviceName].files[pth] if !found { // A file was created. - creates[path] = info + creates[pth] = info continue } if oldInfo.ModTime() != info.ModTime() { - w.watcherConfigs[serviceName].files[path] = info + w.watcherConfigs[serviceName].files[pth] = info select { case <-cancel: return - case w.Event <- Event{Write, path, path, info, "http"}: + case w.Event <- Event{Write, pth, pth, info, serviceName}: } } if oldInfo.Mode() != info.Mode() { - w.watcherConfigs[serviceName].files[path] = info + w.watcherConfigs[serviceName].files[pth] = info select { case <-cancel: return - case w.Event <- Event{Chmod, path, path, info, "http"}: + case w.Event <- Event{Chmod, pth, pth, info, serviceName}: } } } -- cgit v1.2.3 From 2efcfeb89861ba981f980bb4503c31ca6c7a92e0 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 20 Feb 2020 14:14:11 +0300 Subject: Declare general interfaces, Controllable and Attacher instead of private First dirty working example of reload --- service/limit/service.go | 8 +-- service/reload/config.go | 12 +++-- service/reload/service.go | 122 +++++++++++++++++++++++++++++----------------- service/reload/watcher.go | 116 ++++++++++++++++++++++++------------------- 4 files changed, 153 insertions(+), 105 deletions(-) (limited to 'service') diff --git a/service/limit/service.go b/service/limit/service.go index 6af571e2..c0b4139c 100644 --- a/service/limit/service.go +++ b/service/limit/service.go @@ -8,12 +8,6 @@ import ( // ID defines controller service name. const ID = "limit" -// controllable defines the ability to attach rr controller. -type controllable interface { - // Attach attaches controller to the service. - Attach(c roadrunner.Controller) -} - // Service to control the state of rr service inside other services. type Service struct { lsns []func(event int, ctx interface{}) @@ -24,7 +18,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { // mount Services to designated services for id, watcher := range cfg.Controllers(s.throw) { svc, _ := c.Get(id) - if ctrl, ok := svc.(controllable); ok { + if ctrl, ok := svc.(roadrunner.Attacher); ok { ctrl.Attach(watcher) } } diff --git a/service/reload/config.go b/service/reload/config.go index fb704015..f684a227 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -1,6 +1,9 @@ package reload -import "github.com/spiral/roadrunner/service" +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" +) // Config is a Reload configuration point. type Config struct { @@ -20,11 +23,13 @@ type ServiceConfig struct { // Patterns is per-service specific files to watch Patterns []string // Dirs is per-service specific dirs which will be combined with Patterns - Dirs []string + Dirs []string // Ignore is set of files which would not be watched Ignore []string -} + // service is a link to service to restart + service *roadrunner.Controllable +} // Hydrate must populate Config values using given Config source. Must return error if Config is not valid. func (c *Config) Hydrate(cfg service.Config) error { @@ -40,4 +45,3 @@ func (c *Config) InitDefaults() error { return nil } - diff --git a/service/reload/service.go b/service/reload/service.go index 5a400159..9b3ac2f0 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -1,6 +1,8 @@ package reload import ( + "fmt" + "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "os" "strings" @@ -12,79 +14,111 @@ const ID = "reload" type Service struct { reloadConfig *Config - container service.Container watcher *Watcher + } // Init controller service func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { - s.container = c s.reloadConfig = cfg - return true, nil -} + var err error + var configs []WatcherConfig + + // mount Services to designated services + for serviceName, _ := range cfg.Services { + svc, _ := c.Get(serviceName) + if ctrl, ok := svc.(roadrunner.Controllable); ok { + tmp := cfg.Services[serviceName] + tmp.service = &ctrl + cfg.Services[serviceName] = tmp + } + } -func (s *Service) Serve() error { - if !s.reloadConfig.Enabled { - return nil + + + for serviceName, config := range s.reloadConfig.Services { + if cfg.Services[serviceName].service == nil { + continue + } + configs = append(configs, WatcherConfig{ + serviceName: serviceName, + recursive: config.Recursive, + directories: config.Dirs, + filterHooks: func(filename, pattern string) error { + if strings.Contains(filename, pattern) { + return ErrorSkip + } + return nil + }, + files: make(map[string]os.FileInfo), + //ignored: + }) } - var err error - s.watcher, err = NewWatcher([]WatcherConfig{WatcherConfig{ - serviceName: "test_service_name", - recursive: false, - directories: []string{"/service"}, - filterHooks: func(filename, pattern string) error { - if strings.Contains(filename, pattern) { - return ErrorSkip - } - return nil - }, - files: make(map[string]os.FileInfo), - //ignored: []string{".php"}, - }}) + s.watcher, err = NewWatcher(configs) if err != nil { - return err + return false, err } + for serviceName, config := range s.reloadConfig.Services { + svc, _ := c.Get(serviceName) + if ctrl, ok := svc.(*roadrunner.Controllable); ok { + (*ctrl).Server().Reset() + } + configs = append(configs, WatcherConfig{ + serviceName: serviceName, + recursive: config.Recursive, + directories: config.Dirs, + filterHooks: func(filename, pattern string) error { + if strings.Contains(filename, pattern) { + return ErrorSkip + } + return nil + }, + files: make(map[string]os.FileInfo), + //ignored: + + }) + } - s.watcher.AddSingle("test_service_name", "/service") + return true, nil +} +func (s *Service) Serve() error { + if !s.reloadConfig.Enabled { + return nil + } go func() { for { select { case e := <-s.watcher.Event: - println(e.Type) + println(fmt.Sprintf("type is:%s, oldPath:%s, path:%s, name:%s", e.Type, e.OldPath, e.Path, e.FileInfo.Name())) + + srv := s.reloadConfig.Services[e.Type] + + if srv.service != nil { + s := *srv.service + err := s.Server().Reset() + if err != nil { + fmt.Println(err) + } + } else { + s.watcher.mu.Lock() + delete(s.watcher.watcherConfigs, e.Type) + s.watcher.mu.Unlock() + } } } - //for e = range w.Event { - // - // println("event") - // // todo use status - // //svc, _ := s.container.Get("http") - // //if svc != nil { - // // if srv, ok := svc.(service.Service); ok { - // // srv.Stop() - // // err = srv.Serve() - // // if err != nil { - // // return err - // // } - // // } - // //} - // - // //println("event skipped due to service is nil") - //} }() - err = s.watcher.StartPolling(time.Second) + err := s.watcher.StartPolling(time.Second * 2) if err != nil { return err } - // read events and restart corresponding services - return nil } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 7b224319..8b1ee7b2 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -87,6 +87,7 @@ type Watcher struct { operations map[Op]struct{} // Op filtering. // config for each service + // need pointer here to assign files watcherConfigs map[string]WatcherConfig } @@ -125,9 +126,27 @@ func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { return nil, NoWalkerConfig } + err = w.initFs() + if err != nil { + return nil, err + } + return w, nil } +func (w *Watcher) initFs() error { + for srvName, config := range w.watcherConfigs { + fileList, err := w.retrieveFileList(srvName, config) + if err != nil { + return err + } + tmp := w.watcherConfigs[srvName] + tmp.files = fileList + w.watcherConfigs[srvName] = tmp + } + return nil +} + func (w *Watcher) AddWatcherConfig(config WatcherConfig) { w.watcherConfigs[config.serviceName] = config } @@ -310,7 +329,6 @@ func (w *Watcher) StartPolling(duration time.Duration) error { func (w *Watcher) waitEvent(d time.Duration) error { for { cancel := make(chan struct{}) - ticker := time.NewTicker(d) for { select { @@ -324,16 +342,10 @@ func (w *Watcher) waitEvent(d time.Duration) error { for serviceName, config := range w.watcherConfigs { go func(sn string, c WatcherConfig) { - w.wg.Add(1) fileList, _ := w.retrieveFileList(sn, c) w.pollEvents(c.serviceName, fileList, cancel) - w.wg.Done() }(serviceName, config) } - - w.wg.Wait() - - //w.mu.Unlock() default: } @@ -342,6 +354,8 @@ func (w *Watcher) waitEvent(d time.Duration) error { } func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + w.mu.Lock() + defer w.mu.Unlock() fileList := make(map[string]os.FileInfo) if config.recursive { // walk through directories recursively @@ -360,7 +374,6 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma } } - for _, dir := range config.directories { absPath, err := filepath.Abs(w.workingDir) if err != nil { @@ -466,6 +479,9 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, c // Check for created files, writes and chmods. for pth, info := range files { + if info.IsDir() { + continue + } oldInfo, found := w.watcherConfigs[serviceName].files[pth] if !found { // A file was created. @@ -491,46 +507,46 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, c } // Check for renames and moves. - for path1, info1 := range removes { - for path2, info2 := range creates { - if sameFile(info1, info2) { - e := Event{ - Op: Move, - Path: path2, - OldPath: path1, - FileInfo: info1, - } - // If they are from the same directory, it's a rename - // instead of a move event. - if filepath.Dir(path1) == filepath.Dir(path2) { - e.Op = Rename - } - - delete(removes, path1) - delete(creates, path2) - - select { - case <-cancel: - return - case w.Event <- e: - } - } - } - } - - // Send all the remaining create and remove events. - for path, info := range creates { - select { - case <-cancel: - return - case w.Event <- Event{Create, path, "", info, "http"}: - } - } - for path, info := range removes { - select { - case <-cancel: - return - case w.Event <- Event{Remove, path, path, info, "http"}: - } - } + //for path1, info1 := range removes { + // for path2, info2 := range creates { + // if sameFile(info1, info2) { + // e := Event{ + // Op: Move, + // Path: path2, + // OldPath: path1, + // FileInfo: info1, + // } + // // If they are from the same directory, it's a rename + // // instead of a move event. + // if filepath.Dir(path1) == filepath.Dir(path2) { + // e.Op = Rename + // } + // + // delete(removes, path1) + // delete(creates, path2) + // + // select { + // case <-cancel: + // return + // case w.Event <- e: + // } + // } + // } + //} + // + ////Send all the remaining create and remove events. + //for pth, info := range creates { + // select { + // case <-cancel: + // return + // case w.Event <- Event{Create, pth, pth, info, serviceName}: + // } + //} + //for pth, info := range removes { + // select { + // case <-cancel: + // return + // case w.Event <- Event{Remove, pth, pth, info, serviceName}: + // } + //} } -- cgit v1.2.3 From e32bc78ec1fc32b81c0029bbfee14bb570057554 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 10:32:51 +0300 Subject: Update service to support file patterns Update watcher Gracefull stop --- service/reload/config.go | 10 +- service/reload/service.go | 68 +++++------ service/reload/watcher.go | 285 +++++++++++++++++----------------------------- 3 files changed, 136 insertions(+), 227 deletions(-) (limited to 'service') diff --git a/service/reload/config.go b/service/reload/config.go index f684a227..af1c79eb 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -3,16 +3,17 @@ package reload import ( "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" + "time" ) // Config is a Reload configuration point. type Config struct { // Enable or disable Reload extension, default disable. Enabled bool - - // Watch is general pattern of files to watch. It will be applied to every directory in project - Watch []string - + // Interval is a global refresh interval + Interval time.Duration + // Patterns is a global file patterns to watch. It will be applied to every directory in project + Patterns []string // Services is set of services which would be reloaded in case of FS changes Services map[string]ServiceConfig } @@ -26,7 +27,6 @@ type ServiceConfig struct { Dirs []string // Ignore is set of files which would not be watched Ignore []string - // service is a link to service to restart service *roadrunner.Controllable } diff --git a/service/reload/service.go b/service/reload/service.go index 9b3ac2f0..16648007 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -1,6 +1,7 @@ package reload import ( + "errors" "fmt" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" @@ -15,14 +16,19 @@ const ID = "reload" type Service struct { reloadConfig *Config watcher *Watcher - } // Init controller service func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { s.reloadConfig = cfg + if !s.reloadConfig.Enabled { + return false, nil + } + wd, err := os.Getwd() + if err != nil { + return false, err + } - var err error var configs []WatcherConfig // mount Services to designated services @@ -35,8 +41,6 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } } - - for serviceName, config := range s.reloadConfig.Services { if cfg.Services[serviceName].service == nil { continue @@ -45,44 +49,26 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { serviceName: serviceName, recursive: config.Recursive, directories: config.Dirs, - filterHooks: func(filename, pattern string) error { - if strings.Contains(filename, pattern) { - return ErrorSkip + filterHooks: func(filename string, patterns []string) error { + + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } } - return nil + return ErrorSkip }, - files: make(map[string]os.FileInfo), - //ignored: + files: make(map[string]os.FileInfo), + ignored: ConvertIgnored(wd, config.Ignore), + filePatterns: config.Patterns, }) } - s.watcher, err = NewWatcher(configs) + s.watcher, err = NewWatcher(wd, configs) if err != nil { return false, err } - for serviceName, config := range s.reloadConfig.Services { - svc, _ := c.Get(serviceName) - if ctrl, ok := svc.(*roadrunner.Controllable); ok { - (*ctrl).Server().Reset() - } - - configs = append(configs, WatcherConfig{ - serviceName: serviceName, - recursive: config.Recursive, - directories: config.Dirs, - filterHooks: func(filename, pattern string) error { - if strings.Contains(filename, pattern) { - return ErrorSkip - } - return nil - }, - files: make(map[string]os.FileInfo), - //ignored: - - }) - } - return true, nil } @@ -90,14 +76,13 @@ func (s *Service) Serve() error { if !s.reloadConfig.Enabled { return nil } - go func() { for { select { case e := <-s.watcher.Event: - println(fmt.Sprintf("type is:%s, oldPath:%s, path:%s, name:%s", e.Type, e.OldPath, e.Path, e.FileInfo.Name())) + println(fmt.Sprintf("Service is:%s, path:%s, name:%s", e.service, e.path, e.info.Name())) - srv := s.reloadConfig.Services[e.Type] + srv := s.reloadConfig.Services[e.service] if srv.service != nil { s := *srv.service @@ -107,14 +92,18 @@ func (s *Service) Serve() error { } } else { s.watcher.mu.Lock() - delete(s.watcher.watcherConfigs, e.Type) + delete(s.watcher.watcherConfigs, e.service) s.watcher.mu.Unlock() } } } }() - err := s.watcher.StartPolling(time.Second * 2) + if s.reloadConfig.Interval < time.Second { + return errors.New("too fast") + } + + err := s.watcher.StartPolling(s.reloadConfig.Interval) if err != nil { return err } @@ -123,6 +112,5 @@ func (s *Service) Serve() error { } func (s *Service) Stop() { - //s.watcher.Stop() - + s.watcher.Stop() } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 8b1ee7b2..da8007a3 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -10,46 +10,20 @@ import ( "time" ) -// An Op is a type that is used to describe what type -// of event has occurred during the watching process. -type Op uint32 - -// Ops -const ( - Create Op = iota - Write - Remove - Rename - Chmod - Move -) - -var ops = map[Op]string{ - Create: "CREATE", - Write: "WRITE", - Remove: "REMOVE", - Rename: "RENAME", - Chmod: "CHMOD", - Move: "MOVE", -} - var ErrorSkip = errors.New("file is skipped") var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true") // SimpleHook is used to filter by simple criteria, CONTAINS -type SimpleHook func(filename, pattern string) error +type SimpleHook func(filename string, pattern []string) error // An Event describes an event that is received when files or directory // changes occur. It includes the os.FileInfo of the changed file or // directory and the type of event that's occurred and the full path of the file. type Event struct { - Op + path string + info os.FileInfo - Path string - OldPath string - os.FileInfo - - Type string // type of event, http, grpc, etc... + service string // type of service, http, grpc, etc... } type WatcherConfig struct { @@ -60,20 +34,20 @@ type WatcherConfig struct { // directories used per-service directories []string // simple hook, just CONTAINS - filterHooks SimpleHook + filterHooks func(filename string, pattern []string) error // path to file with files files map[string]os.FileInfo - // //ignored files or directories, used map for O(1) amortized get + // ignored directories, used map for O(1) amortized get ignored map[string]string + // filePatterns to ignore + filePatterns []string } type Watcher struct { // main event channel Event chan Event + close chan struct{} - errors chan error - close chan struct{} - Closed chan struct{} //============================= mu *sync.Mutex wg *sync.WaitGroup @@ -83,9 +57,6 @@ type Watcher struct { // working directory, same for all workingDir string - // operation type - operations map[Op]struct{} // Op filtering. - // config for each service // need pointer here to assign files watcherConfigs map[string]WatcherConfig @@ -94,22 +65,15 @@ type Watcher struct { // Options is used to set Watcher Options type Options func(*Watcher) -func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { - dir, err := os.Getwd() - if err != nil { - return nil, err - } - +func NewWatcher(workDir string, configs []WatcherConfig, options ...Options) (*Watcher, error) { w := &Watcher{ Event: make(chan Event), mu: &sync.Mutex{}, wg: &sync.WaitGroup{}, - Closed: make(chan struct{}), - close: make(chan struct{}), + close: make(chan struct{}), - workingDir: dir, - operations: make(map[Op]struct{}), + workingDir: workDir, watcherConfigs: make(map[string]WatcherConfig), } @@ -126,7 +90,7 @@ func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { return nil, NoWalkerConfig } - err = w.initFs() + err := w.initFs() if err != nil { return nil, err } @@ -140,6 +104,7 @@ func (w *Watcher) initFs() error { if err != nil { return err } + // workaround. in golang you can't assign to map in struct field tmp := w.watcherConfigs[srvName] tmp.files = fileList w.watcherConfigs[srvName] = tmp @@ -147,6 +112,21 @@ func (w *Watcher) initFs() error { return nil } +func ConvertIgnored(workdir string, ignored []string) map[string]string { + abs, _ := filepath.Abs(workdir) + if len(ignored) == 0 { + return nil + } + + ign := make(map[string]string, len(ignored)) + for i := 0; i < len(ignored); i++ { + ign[filepath.Join(abs, ignored[i])] = filepath.Join(abs, ignored[i]) + } + + return ign + +} + func (w *Watcher) AddWatcherConfig(config WatcherConfig) { w.watcherConfigs[config.serviceName] = config } @@ -260,7 +240,6 @@ func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.F // recursive calls are slow in compare to goto // so, we will add files with goto pattern - outer: for i := 0; i < len(fileInfoList); i++ { var pathToFile string @@ -277,9 +256,9 @@ outer: continue } - err := w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), pathToFile) - if err != nil { - // if err is not nil, move to the start of the cycle since the pathToFile not match the hook + // if filename does not contain pattern --> ignore that file + err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { continue outer } @@ -309,48 +288,24 @@ func (w *Watcher) StartPolling(duration time.Duration) error { return w.waitEvent(duration) } -//func (w *Watcher) updatedFileListForConfig(config WatcherConfig) (map[string]os.FileInfo, error) { -// if config.recursive { -// return nil, nil -// } -// -// for _, v := range config.directories { -// files, err := w.retrieveFilesSingle(path.Join(w.workingDir, v)) -// if err != nil { -// return nil, err -// } -// -// } -// -// return nil, nil -//} - // this is blocking operation func (w *Watcher) waitEvent(d time.Duration) error { + ticker := time.NewTicker(d) for { - cancel := make(chan struct{}) - ticker := time.NewTicker(d) - for { - select { - case <-w.close: - close(cancel) - close(w.Closed) - return nil - case <-ticker.C: - //fileList := make(map[string]os.FileInfo, 100) - //w.mu.Lock() - - for serviceName, config := range w.watcherConfigs { - go func(sn string, c WatcherConfig) { - fileList, _ := w.retrieveFileList(sn, c) - w.pollEvents(c.serviceName, fileList, cancel) - }(serviceName, config) - } - default: - + select { + case <-w.close: + ticker.Stop() + return nil + case <-ticker.C: + for serviceName, config := range w.watcherConfigs { + go func(sn string, c WatcherConfig) { + fileList, _ := w.retrieveFileList(sn, c) + w.pollEvents(c.serviceName, fileList) + }(serviceName, config) } } } + } func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { @@ -392,43 +347,8 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma } return fileList, nil - - // Add the file's to the file list. - - //return nil } -// RemoveRecursive removes either a single file or a directory recursively from -// the file's list. -//func (w *Watcher) RemoveRecursive(name string) (err error) { -// w.mu.Lock() -// defer w.mu.Unlock() -// -// name, err = filepath.Abs(name) -// if err != nil { -// return err -// } -// -// // If name is a single file, remove it and return. -// info, found := w.files[name] -// if !found { -// return nil // Doesn't exist, just return. -// } -// if !info.IsDir() { -// delete(w.files, name) -// return nil -// } -// -// // If it's a directory, delete all of it's contents recursively -// // from w.files. -// for path := range w.files { -// if strings.HasPrefix(path, name) { -// delete(w.files, path) -// } -// } -// return nil -//} - func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) { fileList := make(map[string]os.FileInfo) @@ -437,14 +357,11 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o return err } - // filename, pattern TODO - //err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path) - //if err == ErrorSkip { - // return nil - //} - //if err != nil { - // return err - //} + // if filename does not contain pattern --> ignore that file + err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { + return nil + } // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. @@ -462,7 +379,7 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o }) } -func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, cancel chan struct{}) { +func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { w.mu.Lock() defer w.mu.Unlock() @@ -491,62 +408,66 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, c if oldInfo.ModTime() != info.ModTime() { w.watcherConfigs[serviceName].files[pth] = info select { - case <-cancel: - return - case w.Event <- Event{Write, pth, pth, info, serviceName}: + case w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + }: } } if oldInfo.Mode() != info.Mode() { w.watcherConfigs[serviceName].files[pth] = info select { - case <-cancel: - return - case w.Event <- Event{Chmod, pth, pth, info, serviceName}: + case w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + }: } } } - // Check for renames and moves. - //for path1, info1 := range removes { - // for path2, info2 := range creates { - // if sameFile(info1, info2) { - // e := Event{ - // Op: Move, - // Path: path2, - // OldPath: path1, - // FileInfo: info1, - // } - // // If they are from the same directory, it's a rename - // // instead of a move event. - // if filepath.Dir(path1) == filepath.Dir(path2) { - // e.Op = Rename - // } - // - // delete(removes, path1) - // delete(creates, path2) - // - // select { - // case <-cancel: - // return - // case w.Event <- e: - // } - // } - // } - //} - // - ////Send all the remaining create and remove events. - //for pth, info := range creates { - // select { - // case <-cancel: - // return - // case w.Event <- Event{Create, pth, pth, info, serviceName}: - // } - //} - //for pth, info := range removes { - // select { - // case <-cancel: - // return - // case w.Event <- Event{Remove, pth, pth, info, serviceName}: - // } - //} + //Check for renames and moves. + for path1, info1 := range removes { + for path2, info2 := range creates { + if sameFile(info1, info2) { + e := Event{ + path: path2, + info: info2, + service: serviceName, + } + + delete(removes, path1) + delete(creates, path2) + + select { + case w.Event <- e: + } + } + } + } + + //Send all the remaining create and remove events. + for pth, info := range creates { + select { + case w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + }: + } + } + for pth, info := range removes { + select { + case w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + }: + } + } +} + +func (w *Watcher) Stop() { + w.close <- struct{}{} } -- cgit v1.2.3 From 7e522cf71c9eac90149ed513487d65f23b6a28b6 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 10:55:45 +0300 Subject: Slight update Start to write tests --- service/reload/service.go | 2 +- service/reload/watcher.go | 86 ++++-------------------------------------- service/reload/watcher_test.go | 8 ++++ 3 files changed, 16 insertions(+), 80 deletions(-) create mode 100644 service/reload/watcher_test.go (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index 16648007..12b157ab 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -80,7 +80,7 @@ func (s *Service) Serve() error { for { select { case e := <-s.watcher.Event: - println(fmt.Sprintf("Service is:%s, path:%s, name:%s", e.service, e.path, e.info.Name())) + println(fmt.Sprintf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name())) srv := s.reloadConfig.Services[e.service] diff --git a/service/reload/watcher.go b/service/reload/watcher.go index da8007a3..a9005a00 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -65,6 +65,7 @@ type Watcher struct { // Options is used to set Watcher Options type Options func(*Watcher) +// NewWatcher returns new instance of File Watcher func NewWatcher(workDir string, configs []WatcherConfig, options ...Options) (*Watcher, error) { w := &Watcher{ Event: make(chan Event), @@ -98,6 +99,7 @@ func NewWatcher(workDir string, configs []WatcherConfig, options ...Options) (*W return w, nil } +// initFs makes initial map with files func (w *Watcher) initFs() error { for srvName, config := range w.watcherConfigs { fileList, err := w.retrieveFileList(srvName, config) @@ -112,6 +114,7 @@ func (w *Watcher) initFs() error { return nil } +// ConvertIgnored is used to convert slice to map with ignored files func ConvertIgnored(workdir string, ignored []string) map[string]string { abs, _ := filepath.Abs(workdir) if len(ignored) == 0 { @@ -127,10 +130,6 @@ func ConvertIgnored(workdir string, ignored []string) map[string]string { } -func (w *Watcher) AddWatcherConfig(config WatcherConfig) { - w.watcherConfigs[config.serviceName] = config -} - // https://en.wikipedia.org/wiki/Inotify // SetMaxFileEvents sets max file notify events for Watcher // In case of file watch errors, this value can be increased system-wide @@ -143,81 +142,6 @@ func (w *Watcher) AddWatcherConfig(config WatcherConfig) { // //} -// SetDefaultRootPath is used to set own root path for adding files -func SetDefaultRootPath(path string) Options { - return func(watcher *Watcher) { - watcher.workingDir = path - } -} - -// Add -// name will be current working dir -func (w *Watcher) AddSingle(serviceName, relPath string) error { - absPath, err := filepath.Abs(w.workingDir) - if err != nil { - return err - } - - // full path is workdir/relative_path - fullPath := path.Join(absPath, relPath) - - // Ignored files - // map is to have O(1) when search for file - _, ignored := w.watcherConfigs[serviceName].ignored[fullPath] - if ignored { - return nil - } - - // small optimization for smallvector - //fileList := make(map[string]os.FileInfo, 10) - fileList, err := w.retrieveFilesSingle(serviceName, fullPath) - if err != nil { - return err - } - - for fullPth, file := range fileList { - w.watcherConfigs[serviceName].files[fullPth] = file - } - - return nil -} - -func (w *Watcher) AddRecursive(serviceName string, relPath string) error { - workDirAbs, err := filepath.Abs(w.workingDir) - if err != nil { - return err - } - - fullPath := path.Join(workDirAbs, relPath) - - filesList, err := w.retrieveFilesRecursive(serviceName, fullPath) - if err != nil { - return err - } - - for pathToFile, file := range filesList { - w.watcherConfigs[serviceName].files[pathToFile] = file - } - - return nil -} - -func (w *Watcher) AddIgnored(serviceName string, directories []string) error { - workDirAbs, err := filepath.Abs(w.workingDir) - if err != nil { - return err - } - - // concat wd with relative paths from config - // todo check path for existance - for _, v := range directories { - fullPath := path.Join(workDirAbs, v) - w.watcherConfigs[serviceName].ignored[fullPath] = fullPath - } - - return nil -} - // pass map from outside func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { stat, err := os.Stat(path) @@ -297,6 +221,9 @@ func (w *Watcher) waitEvent(d time.Duration) error { ticker.Stop() return nil case <-ticker.C: + // this is not very effective way + // because we have to wait on Lock + // better is to listen files in parallel, but, since that would be used in debug... TODO for serviceName, config := range w.watcherConfigs { go func(sn string, c WatcherConfig) { fileList, _ := w.retrieveFileList(sn, c) @@ -308,6 +235,7 @@ func (w *Watcher) waitEvent(d time.Duration) error { } +// retrieveFileList get file list for service func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { w.mu.Lock() defer w.mu.Unlock() diff --git a/service/reload/watcher_test.go b/service/reload/watcher_test.go new file mode 100644 index 00000000..4e5e3210 --- /dev/null +++ b/service/reload/watcher_test.go @@ -0,0 +1,8 @@ +package reload + +import "testing" + +func Test_Watcher(t *testing.T) { + +} + -- cgit v1.2.3 From a835127776e5f4c12cce68fdbac8e43684b0ca6d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 13:52:49 +0300 Subject: Fix misconfiguration --- service/reload/config.go | 3 +-- service/reload/service.go | 11 ++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'service') diff --git a/service/reload/config.go b/service/reload/config.go index af1c79eb..551fb71b 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -41,7 +41,6 @@ func (c *Config) Hydrate(cfg service.Config) error { // InitDefaults sets missing values to their default values. func (c *Config) InitDefaults() error { - //c.Interval = time.Second - + c.Enabled = false return nil } diff --git a/service/reload/service.go b/service/reload/service.go index 12b157ab..461f9430 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -60,7 +60,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { }, files: make(map[string]os.FileInfo), ignored: ConvertIgnored(wd, config.Ignore), - filePatterns: config.Patterns, + filePatterns: append(config.Patterns, cfg.Patterns...), }) } @@ -76,6 +76,11 @@ func (s *Service) Serve() error { if !s.reloadConfig.Enabled { return nil } + + if s.reloadConfig.Interval < time.Second { + return errors.New("reload interval is too fast") + } + go func() { for { select { @@ -99,10 +104,6 @@ func (s *Service) Serve() error { } }() - if s.reloadConfig.Interval < time.Second { - return errors.New("too fast") - } - err := s.watcher.StartPolling(s.reloadConfig.Interval) if err != nil { return err -- cgit v1.2.3 From b44167f66258712df47c21896961756f8be672df Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 16:16:52 +0300 Subject: Fix issues during the testing --- service/reload/service.go | 13 +++++---- service/reload/watcher.go | 71 ++++++++++++++++++++++++++++------------------- 2 files changed, 49 insertions(+), 35 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index 461f9430..bb85e15d 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -24,10 +24,6 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { if !s.reloadConfig.Enabled { return false, nil } - wd, err := os.Getwd() - if err != nil { - return false, err - } var configs []WatcherConfig @@ -45,6 +41,10 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { if cfg.Services[serviceName].service == nil { continue } + ignored, err := ConvertIgnored(config.Ignore) + if err != nil { + return false, err + } configs = append(configs, WatcherConfig{ serviceName: serviceName, recursive: config.Recursive, @@ -59,12 +59,13 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { return ErrorSkip }, files: make(map[string]os.FileInfo), - ignored: ConvertIgnored(wd, config.Ignore), + ignored: ignored, filePatterns: append(config.Patterns, cfg.Patterns...), }) } - s.watcher, err = NewWatcher(wd, configs) + var err error + s.watcher, err = NewWatcher(configs) if err != nil { return false, err } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index a9005a00..b466fc91 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -4,7 +4,6 @@ import ( "errors" "io/ioutil" "os" - "path" "path/filepath" "sync" "time" @@ -38,7 +37,7 @@ type WatcherConfig struct { // path to file with files files map[string]os.FileInfo // ignored directories, used map for O(1) amortized get - ignored map[string]string + ignored map[string]struct{} // filePatterns to ignore filePatterns []string } @@ -50,12 +49,9 @@ type Watcher struct { //============================= mu *sync.Mutex - wg *sync.WaitGroup // indicates is walker started or not started bool - // working directory, same for all - workingDir string // config for each service // need pointer here to assign files @@ -66,15 +62,14 @@ type Watcher struct { type Options func(*Watcher) // NewWatcher returns new instance of File Watcher -func NewWatcher(workDir string, configs []WatcherConfig, options ...Options) (*Watcher, error) { +func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { w := &Watcher{ Event: make(chan Event), mu: &sync.Mutex{}, - wg: &sync.WaitGroup{}, close: make(chan struct{}), - workingDir: workDir, + //workingDir: workDir, watcherConfigs: make(map[string]WatcherConfig), } @@ -115,19 +110,33 @@ func (w *Watcher) initFs() error { } // ConvertIgnored is used to convert slice to map with ignored files -func ConvertIgnored(workdir string, ignored []string) map[string]string { - abs, _ := filepath.Abs(workdir) +func ConvertIgnored(ignored []string) (map[string]struct{}, error) { if len(ignored) == 0 { - return nil + return nil, nil } - ign := make(map[string]string, len(ignored)) + ign := make(map[string]struct{}, len(ignored)) for i := 0; i < len(ignored); i++ { - ign[filepath.Join(abs, ignored[i])] = filepath.Join(abs, ignored[i]) + abs, err := filepath.Abs(ignored[i]) + if err != nil { + return nil, err + } + ign[abs] = struct{}{} } - return ign + return ign, nil + +} + +// GetAllFiles returns all files initialized for particular company +func (w *Watcher) GetAllFiles(serviceName string) []os.FileInfo { + var ret []os.FileInfo + + for _, v := range w.watcherConfigs[serviceName].files { + ret = append(ret, v) + } + return ret } // https://en.wikipedia.org/wiki/Inotify @@ -181,9 +190,11 @@ outer: } // if filename does not contain pattern --> ignore that file - err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns) - if err == ErrorSkip { - continue outer + if w.watcherConfigs[serviceName].filePatterns != nil && w.watcherConfigs[serviceName].filterHooks != nil { + err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { + continue outer + } } filesList[pathToFile] = fileInfoList[i] @@ -244,7 +255,10 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma // walk through directories recursively for _, dir := range config.directories { // full path is workdir/relative_path - fullPath := path.Join(w.workingDir, dir) + fullPath, err := filepath.Abs(dir) + if err != nil { + return nil, err + } list, err := w.retrieveFilesRecursive(serviceName, fullPath) if err != nil { return nil, err @@ -258,14 +272,12 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma } for _, dir := range config.directories { - absPath, err := filepath.Abs(w.workingDir) + // full path is workdir/relative_path + fullPath, err := filepath.Abs(dir) if err != nil { return nil, err } - // full path is workdir/relative_path - fullPath := path.Join(absPath, dir) - // list is pathToFiles with files list, err := w.retrieveFilesSingle(serviceName, fullPath) @@ -285,22 +297,23 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o return err } - // if filename does not contain pattern --> ignore that file - err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns) - if err == ErrorSkip { - return nil - } - // If path is ignored and it's a directory, skip the directory. If it's // ignored and it's a single file, skip the file. _, ignored := w.watcherConfigs[serviceName].ignored[path] - if ignored { if info.IsDir() { + // if it's dir, ignore whole return filepath.SkipDir } return nil } + + // if filename does not contain pattern --> ignore that file + err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { + return nil + } + // Add the path and it's info to the file list. fileList[path] = info return nil -- cgit v1.2.3 From b2a19749a3e02f38c50e9023a22c60b679933c97 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 18:02:33 +0300 Subject: Add tests Add reload tests to Makefile Remove old code --- service/gzip/service_test.go | 71 ------- service/reload/config.go | 21 +- service/reload/config_test.go | 58 ++++++ service/reload/service.go | 34 ++-- service/reload/watcher.go | 37 +--- service/reload/watcher_test.go | 440 ++++++++++++++++++++++++++++++++++++++++- 6 files changed, 541 insertions(+), 120 deletions(-) (limited to 'service') diff --git a/service/gzip/service_test.go b/service/gzip/service_test.go index 9801860f..858dbe56 100644 --- a/service/gzip/service_test.go +++ b/service/gzip/service_test.go @@ -31,24 +31,6 @@ func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.target), out) } -//func get(url string) (string, *http.Response, error) { -// r, err := http.Get(url) -// if err != nil { -// return "", nil, err -// } -// -// b, err := ioutil.ReadAll(r.Body) -// if err != nil { -// return "", nil, err -// } -// -// err = r.Body.Close() -// if err != nil { -// return "", nil, err -// } -// -// return string(b), r, err -//} func Test_Disabled(t *testing.T) { logger, _ := test.NewNullLogger() @@ -65,56 +47,3 @@ func Test_Disabled(t *testing.T) { assert.NotNil(t, s) assert.Equal(t, service.StatusInactive, st) } - -// func Test_Files(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(rrhttp.ID, &rrhttp.Service{}) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// gzip: `{"enable":true}`, -// static: `{"enable":true, "dir":"../../tests", "forbid":[]}`, -// httpCfg: `{ -// "enable": true, -// "address": ":6029", -// "maxRequestSize": 1024, -// "uploads": { -// "dir": ` + tmpDir() + `, -// "forbid": [] -// }, -// "workers":{ -// "command": "php ../../tests/http/client.php pid pipes", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10000000, -// "destroyTimeout": 10000000 -// } -// } -// }`})) - -// go func() { -// err := c.Serve() -// if err != nil { -// t.Errorf("serve error: %v", err) -// } -// }() -// time.Sleep(time.Millisecond * 1000) -// defer c.Stop() - -// b, _, _ := get("http://localhost:6029/sample.txt") -// assert.Equal(t, "sample", b) -// //header should not contain content-encoding:gzip because content-length < gziphandler.DefaultMinSize -// // b, _, _ := get("http://localhost:6029/gzip-large-file.txt") -// //header should contain content-encoding:gzip because content-length > gziphandler.DefaultMinSize -// } - -//func tmpDir() string { -// p := os.TempDir() -// r, _ := json.Marshal(p) -// -// return string(r) -//} diff --git a/service/reload/config.go b/service/reload/config.go index 551fb71b..930f4dff 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -1,6 +1,7 @@ package reload import ( + "errors" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "time" @@ -41,6 +42,24 @@ func (c *Config) Hydrate(cfg service.Config) error { // InitDefaults sets missing values to their default values. func (c *Config) InitDefaults() error { - c.Enabled = false + return c.Valid() +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + if c.Enabled == true && c.Interval < time.Second { + return errors.New("too short interval") + } + + if c.Enabled { + if c.Services == nil { + return errors.New("should add at least 1 service") + } + + if len(c.Services) == 0 { + return errors.New("should add initialized config") + } + } + return nil } diff --git a/service/reload/config_test.go b/service/reload/config_test.go index 7cad4a5d..dd9a2797 100644 --- a/service/reload/config_test.go +++ b/service/reload/config_test.go @@ -1 +1,59 @@ package reload + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_Config_Valid(t *testing.T) { + services := make(map[string]ServiceConfig) + services["test"] = ServiceConfig{ + Recursive: false, + Patterns: nil, + Dirs: nil, + Ignore: nil, + service: nil, + } + + cfg := &Config{ + Enabled: true, + Interval: time.Second, + Patterns: nil, + Services: services, + } + assert.NoError(t, cfg.Valid()) +} + +func Test_Fake_ServiceConfig(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Enabled: true, + Interval: time.Second, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} + +func Test_Interval(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Enabled: true, + Interval: time.Millisecond, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} + +func Test_NoServiceConfig(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Enabled: true, + Interval: time.Millisecond, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} \ No newline at end of file diff --git a/service/reload/service.go b/service/reload/service.go index bb85e15d..ab249c41 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -28,7 +28,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { var configs []WatcherConfig // mount Services to designated services - for serviceName, _ := range cfg.Services { + for serviceName := range cfg.Services { svc, _ := c.Get(serviceName) if ctrl, ok := svc.(roadrunner.Controllable); ok { tmp := cfg.Services[serviceName] @@ -50,7 +50,6 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { recursive: config.Recursive, directories: config.Dirs, filterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { if strings.Contains(filename, patterns[i]) { return nil @@ -83,24 +82,21 @@ func (s *Service) Serve() error { } go func() { - for { - select { - case e := <-s.watcher.Event: - println(fmt.Sprintf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name())) - - srv := s.reloadConfig.Services[e.service] - - if srv.service != nil { - s := *srv.service - err := s.Server().Reset() - if err != nil { - fmt.Println(err) - } - } else { - s.watcher.mu.Lock() - delete(s.watcher.watcherConfigs, e.service) - s.watcher.mu.Unlock() + for e := range s.watcher.Event { + println(fmt.Sprintf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name())) + + srv := s.reloadConfig.Services[e.service] + + if srv.service != nil { + s := *srv.service + err := s.Server().Reset() + if err != nil { + fmt.Println(err) } + } else { + s.watcher.mu.Lock() + delete(s.watcher.watcherConfigs, e.service) + s.watcher.mu.Unlock() } } }() diff --git a/service/reload/watcher.go b/service/reload/watcher.go index b466fc91..612964c5 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -78,14 +78,10 @@ func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { w.watcherConfigs[v.serviceName] = v } + // apply options for _, option := range options { option(w) } - - if w.watcherConfigs == nil { - return nil, NoWalkerConfig - } - err := w.initFs() if err != nil { return nil, err @@ -205,10 +201,6 @@ outer: } func (w *Watcher) StartPolling(duration time.Duration) error { - if duration < time.Second { - return errors.New("too short duration, please use at least 1 second") - } - w.mu.Lock() if w.started { w.mu.Unlock() @@ -218,8 +210,6 @@ func (w *Watcher) StartPolling(duration time.Duration) error { w.started = true w.mu.Unlock() - //w.wg.Done() - return w.waitEvent(duration) } @@ -267,8 +257,8 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma for k, v := range list { fileList[k] = v } - return fileList, nil } + return fileList, nil } for _, dir := range config.directories { @@ -280,6 +270,9 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma // list is pathToFiles with files list, err := w.retrieveFilesSingle(serviceName, fullPath) + if err != nil { + return nil, err + } for pathToFile, file := range list { fileList[pathToFile] = file @@ -348,22 +341,18 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { } if oldInfo.ModTime() != info.ModTime() { w.watcherConfigs[serviceName].files[pth] = info - select { - case w.Event <- Event{ + w.Event <- Event{ path: pth, info: info, service: serviceName, - }: } } if oldInfo.Mode() != info.Mode() { w.watcherConfigs[serviceName].files[pth] = info - select { - case w.Event <- Event{ + w.Event <- Event{ path: pth, info: info, service: serviceName, - }: } } } @@ -381,30 +370,24 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { delete(removes, path1) delete(creates, path2) - select { - case w.Event <- e: - } + w.Event <- e } } } //Send all the remaining create and remove events. for pth, info := range creates { - select { - case w.Event <- Event{ + w.Event <- Event{ path: pth, info: info, service: serviceName, - }: } } for pth, info := range removes { - select { - case w.Event <- Event{ + w.Event <- Event{ path: pth, info: info, service: serviceName, - }: } } } diff --git a/service/reload/watcher_test.go b/service/reload/watcher_test.go index 4e5e3210..b298a82c 100644 --- a/service/reload/watcher_test.go +++ b/service/reload/watcher_test.go @@ -1,8 +1,444 @@ package reload -import "testing" +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" +) -func Test_Watcher(t *testing.T) { +var testServiceName = "test" +// scenario +// Create walker instance, init with default config, check that Watcher found all files from config +func Test_Correct_Watcher_Init(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: nil, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: nil, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + if len(w.GetAllFiles(testServiceName)) != 2 { + t.Fatal("incorrect directories len") + } +} + +// scenario +// create 3 files, create walker instance +// Start poll events +// change file and see, if event had come to handler +func Test_Get_FileEvent(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: nil, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: nil, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + // should be 3 files and directory + if len(w.GetAllFiles(testServiceName)) != 4 { + t.Fatal("incorrect directories len") + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + if e.path != "file2.txt" { + panic("didn't handle event when write file2") + } + w.Stop() + } + }() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +// scenario +// create 3 files with different extensions, create walker instance +// Start poll events +// change file with txt extension, and see, if event had not come to handler because it was filtered +func Test_FileExtensionFilter(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 2 files (one filtered) and directory + if dirLen != 3 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + fmt.Println(e.info.Name()) + panic("handled event from filtered file") + } + }() + + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + w.Stop() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +// nested +// scenario +// create dir and nested dir +// make files with aaa, bbb and txt extensions, filter txt +// change not filtered file, handle event +func Test_Recursive_Support(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + nestedDir, err := ioutil.TempDir(tempDir, "/nested") + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 3 files (2 from root dir, and 1 from nested), filtered txt + if dirLen != 3 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + // change file in nested directory + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + if e.info.Name() != "file4.aaa" { + panic("wrong handled event from watcher in nested dir") + } + w.Stop() + } + }() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } } +func Test_Wrong_Dir(t *testing.T) { + // no such file or directory + wrongDir := "askdjfhaksdlfksdf" + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{wrongDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + _, err := NewWatcher([]WatcherConfig{wc}) + if err == nil { + t.Fatal(err) + } +} + +func Test_NoServiceConfigAttached(t *testing.T) { + _, err := NewWatcher(nil) + if err == nil { + t.Fatal(err) + } +} + +func Test_Filter_Directory(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + nestedDir, err := ioutil.TempDir(tempDir, "/nested") + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + ignored, err := ConvertIgnored([]string{nestedDir}) + if err != nil { + t.Fatal(err) + } + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: ignored, + filePatterns: []string{"aaa", "bbb", "txt"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 2 files (2 from root dir), filtered other + if dirLen != 2 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + // change file in nested directory + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for range w.Event { + panic("handled event from watcher in nested dir") + } + }() + + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + w.Stop() + + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +func freeResources(path string) error { + return os.RemoveAll(path) +} -- cgit v1.2.3 From 6a23ccdcda44ea8d90eb174ce3aab99d6b67b495 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 21 Feb 2020 18:13:37 +0300 Subject: Tests update --- service/reload/watcher_test.go | 7 ------- 1 file changed, 7 deletions(-) (limited to 'service') diff --git a/service/reload/watcher_test.go b/service/reload/watcher_test.go index b298a82c..449a21df 100644 --- a/service/reload/watcher_test.go +++ b/service/reload/watcher_test.go @@ -336,13 +336,6 @@ func Test_Wrong_Dir(t *testing.T) { } } -func Test_NoServiceConfigAttached(t *testing.T) { - _, err := NewWatcher(nil) - if err == nil { - t.Fatal(err) - } -} - func Test_Filter_Directory(t *testing.T) { tempDir, err := ioutil.TempDir(".", "") defer func() { -- cgit v1.2.3 From 9ef9dfed7928e9a96b9545074f8aeb1468fda46c Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 23 Feb 2020 14:14:05 +0300 Subject: - need batching --- service/reload/config.go | 29 +++++++++++++------------ service/reload/config_test.go | 6 +----- service/reload/service.go | 50 +++++++++++++++++++++++++++++-------------- service/reload/watcher.go | 7 +++++- 4 files changed, 56 insertions(+), 36 deletions(-) (limited to 'service') diff --git a/service/reload/config.go b/service/reload/config.go index 930f4dff..f33b5081 100644 --- a/service/reload/config.go +++ b/service/reload/config.go @@ -9,25 +9,32 @@ import ( // Config is a Reload configuration point. type Config struct { - // Enable or disable Reload extension, default disable. - Enabled bool // Interval is a global refresh interval Interval time.Duration + // Patterns is a global file patterns to watch. It will be applied to every directory in project Patterns []string + // Services is set of services which would be reloaded in case of FS changes Services map[string]ServiceConfig } type ServiceConfig struct { + // Enabled indicates that service must be watched, doest not required when any other option specified + Enabled bool + // Recursive is options to use nested files from root folder Recursive bool + // Patterns is per-service specific files to watch Patterns []string + // Dirs is per-service specific dirs which will be combined with Patterns Dirs []string + // Ignore is set of files which would not be watched Ignore []string + // service is a link to service to restart service *roadrunner.Controllable } @@ -37,29 +44,23 @@ func (c *Config) Hydrate(cfg service.Config) error { if err := cfg.Unmarshal(c); err != nil { return err } + return nil } // InitDefaults sets missing values to their default values. func (c *Config) InitDefaults() error { - return c.Valid() + c.Interval = time.Second + c.Patterns = []string{".php"} + + return nil } // Valid validates the configuration. func (c *Config) Valid() error { - if c.Enabled == true && c.Interval < time.Second { + if c.Interval < time.Second { return errors.New("too short interval") } - if c.Enabled { - if c.Services == nil { - return errors.New("should add at least 1 service") - } - - if len(c.Services) == 0 { - return errors.New("should add initialized config") - } - } - return nil } diff --git a/service/reload/config_test.go b/service/reload/config_test.go index dd9a2797..c9c05a1e 100644 --- a/service/reload/config_test.go +++ b/service/reload/config_test.go @@ -17,7 +17,6 @@ func Test_Config_Valid(t *testing.T) { } cfg := &Config{ - Enabled: true, Interval: time.Second, Patterns: nil, Services: services, @@ -28,7 +27,6 @@ func Test_Config_Valid(t *testing.T) { func Test_Fake_ServiceConfig(t *testing.T) { services := make(map[string]ServiceConfig) cfg := &Config{ - Enabled: true, Interval: time.Second, Patterns: nil, Services: services, @@ -39,7 +37,6 @@ func Test_Fake_ServiceConfig(t *testing.T) { func Test_Interval(t *testing.T) { services := make(map[string]ServiceConfig) cfg := &Config{ - Enabled: true, Interval: time.Millisecond, Patterns: nil, Services: services, @@ -50,10 +47,9 @@ func Test_Interval(t *testing.T) { func Test_NoServiceConfig(t *testing.T) { services := make(map[string]ServiceConfig) cfg := &Config{ - Enabled: true, Interval: time.Millisecond, Patterns: nil, Services: services, } assert.Error(t, cfg.Valid()) -} \ No newline at end of file +} diff --git a/service/reload/service.go b/service/reload/service.go index ab249c41..359b3331 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -3,6 +3,7 @@ package reload import ( "errors" "fmt" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "os" @@ -14,17 +15,20 @@ import ( const ID = "reload" type Service struct { - reloadConfig *Config - watcher *Watcher + cfg *Config + log *logrus.Logger + watcher *Watcher } // Init controller service -func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { - s.reloadConfig = cfg - if !s.reloadConfig.Enabled { +func (s *Service) Init(cfg *Config, log *logrus.Logger, c service.Container) (bool, error) { + if cfg == nil || len(cfg.Services) == 0 { return false, nil } + s.cfg = cfg + s.log = log + var configs []WatcherConfig // mount Services to designated services @@ -37,7 +41,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } } - for serviceName, config := range s.reloadConfig.Services { + for serviceName, config := range s.cfg.Services { if cfg.Services[serviceName].service == nil { continue } @@ -73,25 +77,22 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { } func (s *Service) Serve() error { - if !s.reloadConfig.Enabled { - return nil - } - - if s.reloadConfig.Interval < time.Second { + if s.cfg.Interval < time.Second { return errors.New("reload interval is too fast") } go func() { for e := range s.watcher.Event { + println(fmt.Sprintf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name())) - srv := s.reloadConfig.Services[e.service] + srv := s.cfg.Services[e.service] if srv.service != nil { - s := *srv.service - err := s.Server().Reset() + sv := *srv.service + err := sv.Server().Reset() if err != nil { - fmt.Println(err) + s.log.Error(err) } } else { s.watcher.mu.Lock() @@ -101,7 +102,24 @@ func (s *Service) Serve() error { } }() - err := s.watcher.StartPolling(s.reloadConfig.Interval) + //go func() { + // for { + // select { + // case <-update: + // updated = append(updated, update) + // case <-time.Ticker: + // updated = updated[0:0] + // err := sv.Server().Reset() + // s.log.Debugf( + // "reload %s, found file changes in %s", + // strings.Join(updated, ","), + // ) + // case <-exit: + // } + // } + //}() + + err := s.watcher.StartPolling(s.cfg.Interval) if err != nil { return err } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 612964c5..6c0ba86c 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -28,16 +28,22 @@ type Event struct { type WatcherConfig struct { // service name serviceName string + // recursive or just add by singe directory recursive bool + // directories used per-service directories []string + // simple hook, just CONTAINS filterHooks func(filename string, pattern []string) error + // path to file with files files map[string]os.FileInfo + // ignored directories, used map for O(1) amortized get ignored map[string]struct{} + // filePatterns to ignore filePatterns []string } @@ -194,7 +200,6 @@ outer: } filesList[pathToFile] = fileInfoList[i] - } return filesList, nil -- cgit v1.2.3 From d4d83ac0043cb15d19ae4fa49095434b1c594451 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 14:53:16 +0300 Subject: Fix CR issues --- service/reload/service.go | 83 ++++++++++++++++++++++++++++++----------------- service/reload/watcher.go | 2 ++ 2 files changed, 56 insertions(+), 29 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index 359b3331..832ac185 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -2,11 +2,11 @@ package reload import ( "errors" - "fmt" "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "os" + "runtime" "strings" "time" ) @@ -18,6 +18,7 @@ type Service struct { cfg *Config log *logrus.Logger watcher *Watcher + stopc chan struct{} } // Init controller service @@ -28,6 +29,7 @@ func (s *Service) Init(cfg *Config, log *logrus.Logger, c service.Container) (bo s.cfg = cfg s.log = log + s.stopc = make(chan struct{}) var configs []WatcherConfig @@ -81,44 +83,67 @@ func (s *Service) Serve() error { return errors.New("reload interval is too fast") } + // make a map with unique services + // so, if we would have a 100 events from http service + // in map we would see only 1 key and it's config + treshholdc := make(chan struct { + serviceConfig ServiceConfig + service string + }, 100) + + // drain channel in case of leaved messages + defer func() { + go func() { + for range treshholdc { + + } + }() + }() + go func() { for e := range s.watcher.Event { + s.log.Debugf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name()) + + treshholdc <- struct { + serviceConfig ServiceConfig + service string + }{serviceConfig: s.cfg.Services[e.service], service: e.service} - println(fmt.Sprintf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name())) + } + }() + + // use the same interval + ticker := time.NewTicker(s.cfg.Interval) - srv := s.cfg.Services[e.service] + // map with configs by services + updated := make(map[string]ServiceConfig, 100) - if srv.service != nil { - sv := *srv.service - err := sv.Server().Reset() - if err != nil { - s.log.Error(err) + go func() { + for { + select { + case config := <-treshholdc: + // replace previous value in map by more recent without adding new one + updated[config.service] = config.serviceConfig + case <-ticker.C: + if len(updated) > 0 { + for k, v := range updated { + sv := *v.service + err := sv.Server().Reset() + if err != nil { + s.log.Error(err) + } + s.log.Debugf("found file changes in %s service, reloading", k) + } + // zero map + updated = make(map[string]ServiceConfig, 100) } - } else { - s.watcher.mu.Lock() - delete(s.watcher.watcherConfigs, e.service) - s.watcher.mu.Unlock() + case <-s.stopc: + ticker.Stop() + runtime.Goexit() } } }() - //go func() { - // for { - // select { - // case <-update: - // updated = append(updated, update) - // case <-time.Ticker: - // updated = updated[0:0] - // err := sv.Server().Reset() - // s.log.Debugf( - // "reload %s, found file changes in %s", - // strings.Join(updated, ","), - // ) - // case <-exit: - // } - // } - //}() - err := s.watcher.StartPolling(s.cfg.Interval) if err != nil { return err diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 6c0ba86c..3596ae73 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -382,6 +382,7 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { //Send all the remaining create and remove events. for pth, info := range creates { + w.watcherConfigs[serviceName].files[pth] = info w.Event <- Event{ path: pth, info: info, @@ -389,6 +390,7 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { } } for pth, info := range removes { + delete(w.watcherConfigs[serviceName].files, pth) w.Event <- Event{ path: pth, info: info, -- cgit v1.2.3 From 2b70159897b52e99f4040d1fe6571622c1e7459a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 15:15:02 +0300 Subject: Add FW to removes and deletes --- service/reload/watcher.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'service') diff --git a/service/reload/watcher.go b/service/reload/watcher.go index 3596ae73..d5211d19 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -372,8 +372,11 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { service: serviceName, } - delete(removes, path1) - delete(creates, path2) + // remove initial path + delete(w.watcherConfigs[serviceName].files, path1) + // update with new + w.watcherConfigs[serviceName].files[path2] = info2 + w.Event <- e } -- cgit v1.2.3 From ee15f84a9b545ba99c8d678f14367e216e9980f2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 15:31:45 +0300 Subject: Fix broken imports --- service/reload/service.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index 832ac185..a8e862cb 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -91,6 +91,9 @@ func (s *Service) Serve() error { service string }, 100) + // use the same interval + ticker := time.NewTicker(s.cfg.Interval) + // drain channel in case of leaved messages defer func() { go func() { @@ -109,12 +112,11 @@ func (s *Service) Serve() error { service string }{serviceConfig: s.cfg.Services[e.service], service: e.service} + ticker.Stop() + ticker = time.NewTicker(s.cfg.Interval) } }() - // use the same interval - ticker := time.NewTicker(s.cfg.Interval) - // map with configs by services updated := make(map[string]ServiceConfig, 100) -- cgit v1.2.3 From 88c13f9709bbb0033dddca21e181a5d62f7317e7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 16:13:54 +0300 Subject: Remove data race Smart restart --- service/reload/service.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index a8e862cb..d5b6a024 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -111,9 +111,6 @@ func (s *Service) Serve() error { serviceConfig ServiceConfig service string }{serviceConfig: s.cfg.Services[e.service], service: e.service} - - ticker.Stop() - ticker = time.NewTicker(s.cfg.Interval) } }() @@ -126,6 +123,14 @@ func (s *Service) Serve() error { case config := <-treshholdc: // replace previous value in map by more recent without adding new one updated[config.service] = config.serviceConfig + // stop ticker + ticker.Stop() + // restart + // logic is following: + // if we getting a lot of events, we should't restart particular service on each of it (user doing bug move or very fast typing) + // instead, we are resetting the ticker and wait for Interval time + // If there is no more events, we restart service only once + ticker = time.NewTicker(s.cfg.Interval) case <-ticker.C: if len(updated) > 0 { for k, v := range updated { -- cgit v1.2.3 From 8602c72a17f8ecabef2355d1d24ad569239f7bfa Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 16:24:45 +0300 Subject: Send message to the service stop channel --- service/reload/service.go | 1 + service/reload/watcher.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index d5b6a024..7bfb0f28 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -161,4 +161,5 @@ func (s *Service) Serve() error { func (s *Service) Stop() { s.watcher.Stop() + s.stopc <- struct{}{} } diff --git a/service/reload/watcher.go b/service/reload/watcher.go index d5211d19..a3b8fe1f 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sync" "time" ) @@ -225,7 +226,9 @@ func (w *Watcher) waitEvent(d time.Duration) error { select { case <-w.close: ticker.Stop() - return nil + // just exit + // no matter for the pollEvents + runtime.Goexit() case <-ticker.C: // this is not very effective way // because we have to wait on Lock -- cgit v1.2.3 From 3eb34dbfabdb1dd8e158a5fbb3960c12f51c921b Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 23 Feb 2020 16:30:20 +0300 Subject: - cs --- service/reload/service.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index a8e862cb..2aaed8ad 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -105,8 +105,6 @@ func (s *Service) Serve() error { go func() { for e := range s.watcher.Event { - s.log.Debugf("[UPDATE] Service: %s, path to file: %s, filename: %s", e.service, e.path, e.info.Name()) - treshholdc <- struct { serviceConfig ServiceConfig service string @@ -134,7 +132,7 @@ func (s *Service) Serve() error { if err != nil { s.log.Error(err) } - s.log.Debugf("found file changes in %s service, reloading", k) + s.log.Debugf("[%s] found %v file(s) changes, reloading", k, len(updated)) } // zero map updated = make(map[string]ServiceConfig, 100) -- cgit v1.2.3 From cd494bf847713a984811c5b5d972c47fc6a58168 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 23 Feb 2020 16:40:46 +0300 Subject: Fix GoExit issue --- service/reload/service.go | 3 +-- service/reload/watcher.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'service') diff --git a/service/reload/service.go b/service/reload/service.go index 7bfb0f28..f267375a 100644 --- a/service/reload/service.go +++ b/service/reload/service.go @@ -6,7 +6,6 @@ import ( "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "os" - "runtime" "strings" "time" ) @@ -146,7 +145,7 @@ func (s *Service) Serve() error { } case <-s.stopc: ticker.Stop() - runtime.Goexit() + return } } }() diff --git a/service/reload/watcher.go b/service/reload/watcher.go index a3b8fe1f..027d2d0d 100644 --- a/service/reload/watcher.go +++ b/service/reload/watcher.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "sync" "time" ) @@ -228,7 +227,7 @@ func (w *Watcher) waitEvent(d time.Duration) error { ticker.Stop() // just exit // no matter for the pollEvents - runtime.Goexit() + return nil case <-ticker.C: // this is not very effective way // because we have to wait on Lock -- cgit v1.2.3