summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-02-20 01:02:12 +0300
committerValery Piashchynski <[email protected]>2020-02-20 01:02:12 +0300
commit54fb0dc2baa8d874f0ff090b49c80361bb6a9557 (patch)
treeeb3b00d4aca057206977426cc02abd4f1f35fd17
parent9cff24e4d515d684cefdf46624da90d22224aeaa (diff)
Update wathcer implementation. Need to rethink structure and optimize
algorithms
-rw-r--r--service/reload/config.go8
-rw-r--r--service/reload/service.go91
-rw-r--r--service/reload/watcher.go458
3 files changed, 263 insertions, 294 deletions
diff --git a/service/reload/config.go b/service/reload/config.go
index 338c6eba..fb704015 100644
--- a/service/reload/config.go
+++ b/service/reload/config.go
@@ -15,9 +15,11 @@ type Config struct {
}
type ServiceConfig struct {
- // Watch is per-service specific files to watch
- Watch []string
- // Dirs is per-service specific dirs which will be combined with Watch
+ // Recursive is options to use nested files from root folder
+ Recursive bool
+ // Patterns is per-service specific files to watch
+ Patterns []string
+ // Dirs is per-service specific dirs which will be combined with Patterns
Dirs []string
// Ignore is set of files which would not be watched
Ignore []string
diff --git a/service/reload/service.go b/service/reload/service.go
index 78f147b7..f50d6626 100644
--- a/service/reload/service.go
+++ b/service/reload/service.go
@@ -3,6 +3,7 @@ package reload
import (
"github.com/spiral/roadrunner/service"
"os"
+ "strings"
"time"
)
@@ -11,7 +12,8 @@ const ID = "reload"
type Service struct {
reloadConfig *Config
- container service.Container
+ container service.Container
+ watcher *Watcher
}
// Init controller service
@@ -23,61 +25,70 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
}
func (s *Service) Serve() error {
- w, err := NewWatcher(SetMaxFileEvents(100))
- if err != nil {
- return err
- }
-
- name , err := os.Getwd()
- if err != nil {
- return err
+ if !s.reloadConfig.Enabled {
+ return nil
}
- err = w.AddSingle(name)
+ var err error
+ s.watcher, err = NewWatcher([]WatcherConfig{WatcherConfig{
+ serviceName: "test",
+ recursive: false,
+ directories: []string{"/service"},
+ filterHooks: func(filename, pattern string) error {
+ if strings.Contains(filename, pattern) {
+ return ErrorSkip
+ }
+ return nil
+ },
+ files: make(map[string]os.FileInfo),
+ //ignored: []string{".php"},
+ }})
if err != nil {
return err
}
- go func() {
- err = w.StartPolling(time.Second)
- if err != nil {
- }
- }()
+ s.watcher.AddSingle("test", "/service")
- // read events and restart corresponding services
-
-
- for {
- select {
- case e := <- w.Event:
- println(e.Name())
+ go func() {
+ for {
+ select {
+ case e := <-s.watcher.Event:
+ println(e.Name())
+ }
}
+ //for e = range w.Event {
+ //
+ // println("event")
+ // // todo use status
+ // //svc, _ := s.container.Get("http")
+ // //if svc != nil {
+ // // if srv, ok := svc.(service.Service); ok {
+ // // srv.Stop()
+ // // err = srv.Serve()
+ // // if err != nil {
+ // // return err
+ // // }
+ // // }
+ // //}
+ //
+ // //println("event skipped due to service is nil")
+ //}
+ }()
+
+ err = s.watcher.StartPolling(time.Second)
+ if err != nil {
+ return err
}
- //for e = range w.Event {
- //
- // println("event")
- // // todo use status
- // //svc, _ := s.container.Get("http")
- // //if svc != nil {
- // // if srv, ok := svc.(service.Service); ok {
- // // srv.Stop()
- // // err = srv.Serve()
- // // if err != nil {
- // // return err
- // // }
- // // }
- // //}
- //
- // //println("event skipped due to service is nil")
- //}
+ // read events and restart corresponding services
return nil
}
func (s *Service) Stop() {
+ //s.watcher.Stop()
-} \ No newline at end of file
+}
diff --git a/service/reload/watcher.go b/service/reload/watcher.go
index 8d344b4c..5e57af32 100644
--- a/service/reload/watcher.go
+++ b/service/reload/watcher.go
@@ -4,34 +4,12 @@ import (
"errors"
"io/ioutil"
"os"
+ "path"
"path/filepath"
- "regexp"
- "strings"
"sync"
"time"
)
-// Config is a Reload configuration point.
-//type Config struct {
-// Enable or disable Reload extension, default disable.
-//Enabled bool
-//
-// Watch is general pattern of files to watch. It will be applied to every directory in project
-//Watch []string
-//
-// Services is set of services which would be reloaded in case of FS changes
-//Services map[string]ServiceConfig
-//}
-//
-//type ServiceConfig struct {
-// Watch is per-service specific files to watch
-//Watch []string
-// Dirs is per-service specific dirs which will be combined with Watch
-//Dirs []string
-// Ignore is set of files which would not be watched
-//Ignore []string
-//}
-
// An Op is a type that is used to describe what type
// of event has occurred during the watching process.
type Op uint32
@@ -56,37 +34,10 @@ var ops = map[Op]string{
}
var ErrorSkip = errors.New("file is skipped")
+var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true")
-// FilterFileHookFunc is a function that is called to filter files during listings.
-// If a file is ok to be listed, nil is returned otherwise ErrSkip is returned.
-type FilterFileHookFunc func(info os.FileInfo, fullPath string) error
-
-// RegexFilterHook is a function that accepts or rejects a file
-// for listing based on whether it's filename or full path matches
-// a regular expression.
-func RegexFilterHook(r *regexp.Regexp, useFullPath bool) FilterFileHookFunc {
- return func(info os.FileInfo, fullPath string) error {
- str := info.Name()
-
- if useFullPath {
- str = fullPath
- }
-
- // Match
- if r.MatchString(str) {
- return nil
- }
-
- // No match.
- return ErrorSkip
- }
-}
-
-func SetFileHooks(fileHook ...FilterFileHookFunc) Options {
- return func(watcher *Watcher) {
- watcher.filterHooks = fileHook
- }
-}
+// SimpleHook is used to filter by simple criteria, CONTAINS
+type SimpleHook func(filename, pattern string) error
// An Event describes an event that is received when files or directory
// changes occur. It includes the os.FileInfo of the changed file or
@@ -101,30 +52,48 @@ type Event struct {
Type string // type of event, http, grpc, etc...
}
+type WatcherConfig struct {
+ // service name
+ serviceName string
+ // recursive or just add by singe directory
+ recursive bool
+ // directories used per-service
+ directories []string
+ // simple hook, just CONTAINS
+ filterHooks SimpleHook
+ // path to file with files
+ files map[string]os.FileInfo
+ // //ignored files or directories, used map for O(1) amortized get
+ ignored map[string]string
+}
+
type Watcher struct {
- Event chan Event
+ // main event channel
+ Event chan Event
+
errors chan error
close chan struct{}
Closed chan struct{}
-
+ //=============================
mu *sync.Mutex
- //wg *sync.WaitGroup
+ wg *sync.WaitGroup
- filterHooks []FilterFileHookFunc
+ // indicates is walker started or not
+ started bool
+ // working directory, same for all
+ workingDir string
- started bool // indicates is walker started or not
+ // operation type
+ operations map[Op]struct{} // Op filtering.
- workingDir string
- maxFileWatchEvents int
- operations map[Op]struct{} // Op filtering.
- files map[string]os.FileInfo //files by service, http, grpc, etc..
- ignored map[string]string //ignored files or directories
+ // config for each service
+ watcherConfigs map[string]WatcherConfig
}
// Options is used to set Watcher Options
type Options func(*Watcher)
-func NewWatcher(options ...Options) (*Watcher, error) {
+func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) {
dir, err := os.Getwd()
if err != nil {
return nil, err
@@ -133,34 +102,47 @@ func NewWatcher(options ...Options) (*Watcher, error) {
w := &Watcher{
Event: make(chan Event),
mu: &sync.Mutex{},
- //wg: &sync.WaitGroup{},
- Closed: make(chan struct{}),
- close: make(chan struct{}),
- workingDir: dir,
- operations: make(map[Op]struct{}),
- files: make(map[string]os.FileInfo),
- ignored: make(map[string]string),
+ wg: &sync.WaitGroup{},
+
+ Closed: make(chan struct{}),
+ close: make(chan struct{}),
+
+ workingDir: dir,
+ operations: make(map[Op]struct{}),
+ watcherConfigs: make(map[string]WatcherConfig),
+ }
+
+ // add watcherConfigs by service names
+ for _, v := range configs {
+ w.watcherConfigs[v.serviceName] = v
}
for _, option := range options {
option(w)
}
- // dir --> /home/valery/Projects/opensource/roadrunner
+ if w.watcherConfigs == nil {
+ return nil, NoWalkerConfig
+ }
+
return w, nil
}
+func (w *Watcher) AddWatcherConfig(config WatcherConfig) {
+ w.watcherConfigs[config.serviceName] = config
+}
+
// https://en.wikipedia.org/wiki/Inotify
// SetMaxFileEvents sets max file notify events for Watcher
// In case of file watch errors, this value can be increased system-wide
// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf)
// Add apply: sudo sysctl -p --system
-func SetMaxFileEvents(events int) Options {
- return func(watcher *Watcher) {
- watcher.maxFileWatchEvents = events
- }
-
-}
+//func SetMaxFileEvents(events int) Options {
+// return func(watcher *Watcher) {
+// watcher.maxFileWatchEvents = events
+// }
+//
+//}
// SetDefaultRootPath is used to set own root path for adding files
func SetDefaultRootPath(path string) Options {
@@ -171,63 +153,81 @@ func SetDefaultRootPath(path string) Options {
// Add
// name will be current working dir
-func (w *Watcher) AddSingle(name string) error {
- name, err := filepath.Abs(name)
+func (w *Watcher) AddSingle(serviceName, relPath string) error {
+ absPath, err := filepath.Abs(w.workingDir)
if err != nil {
-
+ return err
}
+ // full path is workdir/relative_path
+ fullPath := path.Join(absPath, relPath)
+
// Ignored files
// map is to have O(1) when search for file
- _, ignored := w.ignored[name]
+ _, ignored := w.watcherConfigs[serviceName].ignored[fullPath]
if ignored {
return nil
}
// small optimization for smallvector
//fileList := make(map[string]os.FileInfo, 10)
- fileList, err := w.retrieveSingleDirectoryContent(name)
+ fileList, err := w.retrieveFilesSingle(serviceName, fullPath)
if err != nil {
return err
}
- for k, v := range fileList {
- w.files[k] = v
+ for fullPth, file := range fileList {
+ w.watcherConfigs[serviceName].files[fullPth] = file
}
return nil
-
}
-func (w *Watcher) AddRecursive(name string) error {
- name, err := filepath.Abs(name)
+func (w *Watcher) AddRecursive(serviceName string, relPath string) error {
+ workDirAbs, err := filepath.Abs(w.workingDir)
if err != nil {
return err
}
- filesList := make(map[string]os.FileInfo, 10)
+ fullPath := path.Join(workDirAbs, relPath)
+ filesList := make(map[string]os.FileInfo, 100)
- err = w.retrieveFilesRecursive(name, filesList)
+ err = w.retrieveFilesRecursive(serviceName, fullPath, filesList)
if err != nil {
return err
}
- for k, v := range filesList {
- w.files[k] = v
+ for pathToFile, file := range filesList {
+ w.watcherConfigs[serviceName].files[pathToFile] = file
+ }
+
+ return nil
+}
+
+func (w *Watcher) AddIgnored(serviceName string, directories []string) error {
+ workDirAbs, err := filepath.Abs(w.workingDir)
+ if err != nil {
+ return err
+ }
+
+ // concat wd with relative paths from config
+ // todo check path for existance
+ for _, v := range directories {
+ fullPath := path.Join(workDirAbs, v)
+ w.watcherConfigs[serviceName].ignored[fullPath] = fullPath
}
return nil
}
// pass map from outside
-func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.FileInfo, error) {
+func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) {
stat, err := os.Stat(path)
if err != nil {
return nil, err
}
filesList := make(map[string]os.FileInfo, 10)
-
filesList[path] = stat
// if it's not a dir, return
@@ -235,20 +235,6 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil
return filesList, nil
}
- //err = filepath.Walk(name, func(path string, info os.FileInfo, err error) error {
- // if info.IsDir() {
- // return nil
- // }
- //
- // fileList[path] = info
- //
- // return nil
- //})
- //
- //if err != nil {
- // return err
- //}
-
fileInfoList, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
@@ -259,29 +245,27 @@ func (w *Watcher) retrieveSingleDirectoryContent(path string) (map[string]os.Fil
outer:
for i := 0; i < len(fileInfoList); i++ {
- var path string
+ var pathToFile string
// BCE check elimination
// https://go101.org/article/bounds-check-elimination.html
if len(fileInfoList) != 0 && len(fileInfoList) >= i {
- path = filepath.Join(path, fileInfoList[i].Name())
+ pathToFile = filepath.Join(pathToFile, fileInfoList[i].Name())
} else {
return nil, errors.New("file info list len")
}
// if file in ignored --> continue
- if _, ignored := w.ignored[path]; ignored {
+ if _, ignored := w.watcherConfigs[serviceName].ignored[path]; ignored {
continue
}
- for _, fh := range w.filterHooks {
- err := fh(fileInfoList[i], path)
- if err != nil {
- // if err is not nil, move to the start of the cycle since the path not match the hook
- continue outer
- }
+ err := w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), pathToFile)
+ if err != nil {
+ // if err is not nil, move to the start of the cycle since the pathToFile not match the hook
+ continue outer
}
- filesList[path] = fileInfoList[i]
+ filesList[pathToFile] = fileInfoList[i]
}
@@ -307,6 +291,22 @@ func (w *Watcher) StartPolling(duration time.Duration) error {
return w.waitEvent(duration)
}
+//func (w *Watcher) updatedFileListForConfig(config WatcherConfig) (map[string]os.FileInfo, error) {
+// if config.recursive {
+// return nil, nil
+// }
+//
+// for _, v := range config.directories {
+// files, err := w.retrieveFilesSingle(path.Join(w.workingDir, v))
+// if err != nil {
+// return nil, err
+// }
+//
+// }
+//
+// return nil, nil
+//}
+
// this is blocking operation
func (w *Watcher) waitEvent(d time.Duration) error {
for {
@@ -343,100 +343,40 @@ func (w *Watcher) waitEvent(d time.Duration) error {
case <-ticker.C:
//fileList := make(map[string]os.FileInfo, 100)
//w.mu.Lock()
- fileList, _ := w.retrieveFileList(w.workingDir, false)
- w.pollEvents(fileList, cancel)
+
+ for serviceName, config := range w.watcherConfigs {
+ fileList, _ := w.retrieveFileList(serviceName, config)
+ w.pollEvents(config.serviceName, fileList, cancel)
+ }
+
//w.mu.Unlock()
default:
}
}
-
- ticker.Stop()
- //inner:
- // for {
- // select {
- // case <-w.close:
- // close(cancel)
- // close(w.Closed)
- // return nil
- // case event := <-evt:
- // //if len(w.operations) > 0 { // Filter Ops.
- // // _, found := w.operations[event.Op]
- // // if !found {
- // // continue
- // // }
- // //}
- // //numEvents++
- // //if w.maxFileWatchEvents > 0 && numEvents > w.maxFileWatchEvents {
- // // close(cancel)
- // // break inner
- // //}
- // w.Event <- event
- // case <-done: // Current cycle is finished.
- // break inner
- // }
- // }
-
- //// Update the file's list.
- //w.mu.Lock()
- //w.files = fileList
- //w.mu.Unlock()
-
- //time.Sleep(d)
- //sleepLoop:
- // for {
- // select {
- // case <-w.close:
- // close(cancel)
- // return nil
- // case <-time.After(d):
- // break sleepLoop
- // }
- // } //end Sleep for
}
}
-func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.FileInfo, error) {
-
- //fileList := make(map[string]os.FileInfo)
-
- //list := make(map[string]os.FileInfo, 100)
- //var err error
-
- if recursive {
- //fileList, err := w.retrieveFilesRecursive(path)
- //if err != nil {
- //if os.IsNotExist(err) {
- // w.mu.Unlock()
- // // todo path error
- // _, ok := err.(*os.PathError)
- // if ok {
- // w.RemoveRecursive(path)
- // }
- // w.mu.Lock()
- //} else {
- // w.errors <- err
- //}
- //}
-
- //for k, v := range fileList {
- // fileList[k] = v
- //}
- //return fileList, nil
- return nil, nil
- } else {
- fileList, err := w.retrieveSingleDirectoryContent(path)
+func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) {
+ if config.recursive {
+ fileList := make(map[string]os.FileInfo)
+ err := w.retrieveFilesRecursive(serviceName, w.workingDir, fileList)
if err != nil {
- //if os.IsNotExist(err) {
- // w.mu.Unlock()
- // _, ok := err.(*os.PathError)
- // if ok {
- // w.RemoveRecursive(path)
- // }
- // w.mu.Lock()
- //} else {
- // w.errors <- err
- //}
+ if os.IsNotExist(err) {
+ w.mu.Unlock()
+ // todo path error
+ _, ok := err.(*os.PathError)
+ if ok {
+ // todo
+ //err = w.RemoveRecursive(path)
+ if err != nil {
+ return nil, err
+ }
+ }
+ w.mu.Lock()
+ } else {
+ w.errors <- err
+ }
}
for k, v := range fileList {
@@ -444,61 +384,81 @@ func (w *Watcher) retrieveFileList(path string, recursive bool) (map[string]os.F
}
return fileList, nil
}
- // Add the file's to the file list.
- //return nil
-}
-
-// RemoveRecursive removes either a single file or a directory recursively from
-// the file's list.
-func (w *Watcher) RemoveRecursive(name string) (err error) {
- w.mu.Lock()
- defer w.mu.Unlock()
+ fileList := make(map[string]os.FileInfo)
+ for _, dir := range config.directories {
+ absPath, err := filepath.Abs(w.workingDir)
+ if err != nil {
+ return nil, err
+ }
- name, err = filepath.Abs(name)
- if err != nil {
- return err
- }
+ // full path is workdir/relative_path
+ fullPath := path.Join(absPath, dir)
- // If name is a single file, remove it and return.
- info, found := w.files[name]
- if !found {
- return nil // Doesn't exist, just return.
- }
- if !info.IsDir() {
- delete(w.files, name)
- return nil
- }
+ // list is pathToFiles with files
+ list, err := w.retrieveFilesSingle(serviceName, fullPath)
- // If it's a directory, delete all of it's contents recursively
- // from w.files.
- for path := range w.files {
- if strings.HasPrefix(path, name) {
- delete(w.files, path)
+ for pathToFile, file := range list {
+ fileList[pathToFile] = file
}
}
- return nil
+
+ return fileList, nil
+
+ // Add the file's to the file list.
+
+ //return nil
}
-func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.FileInfo) error {
- return filepath.Walk(name, func(path string, info os.FileInfo, err error) error {
+// RemoveRecursive removes either a single file or a directory recursively from
+// the file's list.
+//func (w *Watcher) RemoveRecursive(name string) (err error) {
+// w.mu.Lock()
+// defer w.mu.Unlock()
+//
+// name, err = filepath.Abs(name)
+// if err != nil {
+// return err
+// }
+//
+// // If name is a single file, remove it and return.
+// info, found := w.files[name]
+// if !found {
+// return nil // Doesn't exist, just return.
+// }
+// if !info.IsDir() {
+// delete(w.files, name)
+// return nil
+// }
+//
+// // If it's a directory, delete all of it's contents recursively
+// // from w.files.
+// for path := range w.files {
+// if strings.HasPrefix(path, name) {
+// delete(w.files, path)
+// }
+// }
+// return nil
+//}
+
+func (w *Watcher) retrieveFilesRecursive(serviceName, root string, fileList map[string]os.FileInfo) error {
+ return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
- for _, f := range w.filterHooks {
- err := f(info, path)
- if err == ErrorSkip {
- return nil
- }
- if err != nil {
- return err
- }
+ // filename, pattern
+ err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path)
+ if err == ErrorSkip {
+ return nil
+ }
+ if err != nil {
+ return err
}
// If path is ignored and it's a directory, skip the directory. If it's
// ignored and it's a single file, skip the file.
- _, ignored := w.ignored[path]
+ _, ignored := w.watcherConfigs[serviceName].ignored[path]
if ignored {
if info.IsDir() {
@@ -512,12 +472,7 @@ func (w *Watcher) retrieveFilesRecursive(name string, fileList map[string]os.Fil
})
}
-// Wait blocks until the watcher is started.
-//func (w *Watcher) Wait() {
-// w.wg.Wait()
-//}
-
-func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{}) {
+func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, cancel chan struct{}) {
w.mu.Lock()
defer w.mu.Unlock()
@@ -526,7 +481,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{})
removes := make(map[string]os.FileInfo)
// Check for removed files.
- for path, info := range w.files {
+ for path, info := range w.watcherConfigs[serviceName].files {
if _, found := files[path]; !found {
removes[path] = info
}
@@ -534,14 +489,14 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{})
// Check for created files, writes and chmods.
for path, info := range files {
- oldInfo, found := w.files[path]
+ oldInfo, found := w.watcherConfigs[serviceName].files[path]
if !found {
// A file was created.
creates[path] = info
continue
}
if oldInfo.ModTime() != info.ModTime() {
- w.files[path] = info
+ w.watcherConfigs[serviceName].files[path] = info
select {
case <-cancel:
return
@@ -549,6 +504,7 @@ func (w *Watcher) pollEvents(files map[string]os.FileInfo, cancel chan struct{})
}
}
if oldInfo.Mode() != info.Mode() {
+ w.watcherConfigs[serviceName].files[path] = info
select {
case <-cancel:
return