diff options
author | Anton Titov <[email protected]> | 2020-02-23 16:46:15 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-02-23 16:46:15 +0300 |
commit | 954fa52704af5e9d7745de759743e16b3b7b563c (patch) | |
tree | f80a4d4cf8d7725491aa8aa3939fdbd4d17c13e3 | |
parent | 99b8de5cef068446b58899d76fa02cd286837c49 (diff) | |
parent | 74f35de85648d36b328d01656c3857c11217d3b0 (diff) |
Merge pull request #253 from spiral/file_watcher_module
File watcher module
-rw-r--r-- | .rr.yaml | 4 | ||||
-rw-r--r-- | Makefile | 1 | ||||
-rwxr-xr-x | build.sh | 2 | ||||
-rw-r--r-- | cmd/rr/main.go | 2 | ||||
-rw-r--r-- | controller.go | 6 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | server.go | 6 | ||||
-rw-r--r-- | service/gzip/service_test.go | 71 | ||||
-rw-r--r-- | service/limit/service.go | 8 | ||||
-rw-r--r-- | service/reload/config.go | 66 | ||||
-rw-r--r-- | service/reload/config_test.go | 55 | ||||
-rw-r--r-- | service/reload/samefile.go | 9 | ||||
-rw-r--r-- | service/reload/samefile_windows.go | 12 | ||||
-rw-r--r-- | service/reload/service.go | 162 | ||||
-rw-r--r-- | service/reload/service_test.go | 1 | ||||
-rw-r--r-- | service/reload/watcher.go | 409 | ||||
-rw-r--r-- | service/reload/watcher_test.go | 437 |
17 files changed, 1172 insertions, 81 deletions
@@ -160,7 +160,7 @@ static: health: # http host to serve health requests. address: localhost:2113 - + # reload can reset rr servers when files change reload: # refresh internval (default 1s) @@ -176,4 +176,4 @@ reload: dirs: [""] # include sub directories - recursive: true + recursive: true
\ No newline at end of file @@ -22,6 +22,7 @@ test: go test -v -race -cover ./service/metrics go test -v -race -cover ./service/health go test -v -race -cover ./service/gzip + go test -v -race -cover ./service/reload lint: go fmt ./... golint ./...
\ No newline at end of file @@ -12,7 +12,7 @@ LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.BuildTime=$(date +% LDFLAGS="$LDFLAGS -s" build(){ - echo Packaging $1 Build + echo Packaging "$1" Build bdir=roadrunner-${RR_VERSION}-$2-$3 rm -rf builds/"$bdir" && mkdir -p builds/"$bdir" GOOS=$2 GOARCH=$3 ./build.sh diff --git a/cmd/rr/main.go b/cmd/rr/main.go index ef393426..54a1f060 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -33,6 +33,7 @@ import ( "github.com/spiral/roadrunner/service/http" "github.com/spiral/roadrunner/service/limit" "github.com/spiral/roadrunner/service/metrics" + "github.com/spiral/roadrunner/service/reload" "github.com/spiral/roadrunner/service/rpc" "github.com/spiral/roadrunner/service/static" @@ -51,6 +52,7 @@ func main() { rr.Container.Register(limit.ID, &limit.Service{}) rr.Container.Register(health.ID, &health.Service{}) rr.Container.Register(gzip.ID, &gzip.Service{}) + rr.Container.Register(reload.ID, &reload.Service{}) // you can register additional commands using cmd.CLI rr.Execute() diff --git a/controller.go b/controller.go index bda7ad6b..020ea4dd 100644 --- a/controller.go +++ b/controller.go @@ -8,3 +8,9 @@ type Controller interface { // Detach pool watching. Detach() } + +// Attacher defines the ability to attach rr controller. +type Attacher interface { + // Attach attaches controller to the service. + Attach(c Controller) +}
\ No newline at end of file @@ -10,6 +10,8 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/mattn/go-colorable v0.1.4 // indirect github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.4 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.4.1 @@ -23,6 +23,12 @@ const ( EventPoolDestruct ) +// Controllable defines the ability to attach rr controller. +type Controllable interface { + // Server represents RR server + Server() *Server +} + // Server manages pool creation and swapping. type Server struct { // configures server, pool, cmd creation and factory. diff --git a/service/gzip/service_test.go b/service/gzip/service_test.go index 9801860f..858dbe56 100644 --- a/service/gzip/service_test.go +++ b/service/gzip/service_test.go @@ -31,24 +31,6 @@ func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.target), out) } -//func get(url string) (string, *http.Response, error) { -// r, err := http.Get(url) -// if err != nil { -// return "", nil, err -// } -// -// b, err := ioutil.ReadAll(r.Body) -// if err != nil { -// return "", nil, err -// } -// -// err = r.Body.Close() -// if err != nil { -// return "", nil, err -// } -// -// return string(b), r, err -//} func Test_Disabled(t *testing.T) { logger, _ := test.NewNullLogger() @@ -65,56 +47,3 @@ func Test_Disabled(t *testing.T) { assert.NotNil(t, s) assert.Equal(t, service.StatusInactive, st) } - -// func Test_Files(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(rrhttp.ID, &rrhttp.Service{}) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// gzip: `{"enable":true}`, -// static: `{"enable":true, "dir":"../../tests", "forbid":[]}`, -// httpCfg: `{ -// "enable": true, -// "address": ":6029", -// "maxRequestSize": 1024, -// "uploads": { -// "dir": ` + tmpDir() + `, -// "forbid": [] -// }, -// "workers":{ -// "command": "php ../../tests/http/client.php pid pipes", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10000000, -// "destroyTimeout": 10000000 -// } -// } -// }`})) - -// go func() { -// err := c.Serve() -// if err != nil { -// t.Errorf("serve error: %v", err) -// } -// }() -// time.Sleep(time.Millisecond * 1000) -// defer c.Stop() - -// b, _, _ := get("http://localhost:6029/sample.txt") -// assert.Equal(t, "sample", b) -// //header should not contain content-encoding:gzip because content-length < gziphandler.DefaultMinSize -// // b, _, _ := get("http://localhost:6029/gzip-large-file.txt") -// //header should contain content-encoding:gzip because content-length > gziphandler.DefaultMinSize -// } - -//func tmpDir() string { -// p := os.TempDir() -// r, _ := json.Marshal(p) -// -// return string(r) -//} diff --git a/service/limit/service.go b/service/limit/service.go index 6af571e2..c0b4139c 100644 --- a/service/limit/service.go +++ b/service/limit/service.go @@ -8,12 +8,6 @@ import ( // ID defines controller service name. const ID = "limit" -// controllable defines the ability to attach rr controller. -type controllable interface { - // Attach attaches controller to the service. - Attach(c roadrunner.Controller) -} - // Service to control the state of rr service inside other services. type Service struct { lsns []func(event int, ctx interface{}) @@ -24,7 +18,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { // mount Services to designated services for id, watcher := range cfg.Controllers(s.throw) { svc, _ := c.Get(id) - if ctrl, ok := svc.(controllable); ok { + if ctrl, ok := svc.(roadrunner.Attacher); ok { ctrl.Attach(watcher) } } diff --git a/service/reload/config.go b/service/reload/config.go new file mode 100644 index 00000000..f33b5081 --- /dev/null +++ b/service/reload/config.go @@ -0,0 +1,66 @@ +package reload + +import ( + "errors" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config is a Reload configuration point. +type Config struct { + // Interval is a global refresh interval + Interval time.Duration + + // Patterns is a global file patterns to watch. It will be applied to every directory in project + Patterns []string + + // Services is set of services which would be reloaded in case of FS changes + Services map[string]ServiceConfig +} + +type ServiceConfig struct { + // Enabled indicates that service must be watched, doest not required when any other option specified + Enabled bool + + // Recursive is options to use nested files from root folder + Recursive bool + + // Patterns is per-service specific files to watch + Patterns []string + + // Dirs is per-service specific dirs which will be combined with Patterns + Dirs []string + + // Ignore is set of files which would not be watched + Ignore []string + + // service is a link to service to restart + service *roadrunner.Controllable +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + return nil +} + +// InitDefaults sets missing values to their default values. +func (c *Config) InitDefaults() error { + c.Interval = time.Second + c.Patterns = []string{".php"} + + return nil +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + if c.Interval < time.Second { + return errors.New("too short interval") + } + + return nil +} diff --git a/service/reload/config_test.go b/service/reload/config_test.go new file mode 100644 index 00000000..c9c05a1e --- /dev/null +++ b/service/reload/config_test.go @@ -0,0 +1,55 @@ +package reload + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_Config_Valid(t *testing.T) { + services := make(map[string]ServiceConfig) + services["test"] = ServiceConfig{ + Recursive: false, + Patterns: nil, + Dirs: nil, + Ignore: nil, + service: nil, + } + + cfg := &Config{ + Interval: time.Second, + Patterns: nil, + Services: services, + } + assert.NoError(t, cfg.Valid()) +} + +func Test_Fake_ServiceConfig(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Interval: time.Second, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} + +func Test_Interval(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Interval: time.Millisecond, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} + +func Test_NoServiceConfig(t *testing.T) { + services := make(map[string]ServiceConfig) + cfg := &Config{ + Interval: time.Millisecond, + Patterns: nil, + Services: services, + } + assert.Error(t, cfg.Valid()) +} diff --git a/service/reload/samefile.go b/service/reload/samefile.go new file mode 100644 index 00000000..80df0431 --- /dev/null +++ b/service/reload/samefile.go @@ -0,0 +1,9 @@ +// +build !windows + +package reload + +import "os" + +func sameFile(fi1, fi2 os.FileInfo) bool { + return os.SameFile(fi1, fi2) +} diff --git a/service/reload/samefile_windows.go b/service/reload/samefile_windows.go new file mode 100644 index 00000000..5f70d327 --- /dev/null +++ b/service/reload/samefile_windows.go @@ -0,0 +1,12 @@ +// +build windows + +package reload + +import "os" + +func sameFile(fi1, fi2 os.FileInfo) bool { + return fi1.ModTime() == fi2.ModTime() && + fi1.Size() == fi2.Size() && + fi1.Mode() == fi2.Mode() && + fi1.IsDir() == fi2.IsDir() +} diff --git a/service/reload/service.go b/service/reload/service.go new file mode 100644 index 00000000..9c615e0b --- /dev/null +++ b/service/reload/service.go @@ -0,0 +1,162 @@ +package reload + +import ( + "errors" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "os" + "strings" + "time" +) + +// ID contains default service name. +const ID = "reload" + +type Service struct { + cfg *Config + log *logrus.Logger + watcher *Watcher + stopc chan struct{} +} + +// Init controller service +func (s *Service) Init(cfg *Config, log *logrus.Logger, c service.Container) (bool, error) { + if cfg == nil || len(cfg.Services) == 0 { + return false, nil + } + + s.cfg = cfg + s.log = log + s.stopc = make(chan struct{}) + + var configs []WatcherConfig + + // mount Services to designated services + for serviceName := range cfg.Services { + svc, _ := c.Get(serviceName) + if ctrl, ok := svc.(roadrunner.Controllable); ok { + tmp := cfg.Services[serviceName] + tmp.service = &ctrl + cfg.Services[serviceName] = tmp + } + } + + for serviceName, config := range s.cfg.Services { + if cfg.Services[serviceName].service == nil { + continue + } + ignored, err := ConvertIgnored(config.Ignore) + if err != nil { + return false, err + } + configs = append(configs, WatcherConfig{ + serviceName: serviceName, + recursive: config.Recursive, + directories: config.Dirs, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: ignored, + filePatterns: append(config.Patterns, cfg.Patterns...), + }) + } + + var err error + s.watcher, err = NewWatcher(configs) + if err != nil { + return false, err + } + + return true, nil +} + +func (s *Service) Serve() error { + if s.cfg.Interval < time.Second { + return errors.New("reload interval is too fast") + } + + // make a map with unique services + // so, if we would have a 100 events from http service + // in map we would see only 1 key and it's config + treshholdc := make(chan struct { + serviceConfig ServiceConfig + service string + }, 100) + + // use the same interval + ticker := time.NewTicker(s.cfg.Interval) + + // drain channel in case of leaved messages + defer func() { + go func() { + for range treshholdc { + + } + }() + }() + + go func() { + for e := range s.watcher.Event { + treshholdc <- struct { + serviceConfig ServiceConfig + service string + }{serviceConfig: s.cfg.Services[e.service], service: e.service} + } + }() + + // map with configs by services + updated := make(map[string]ServiceConfig, 100) + + go func() { + for { + select { + case config := <-treshholdc: + // replace previous value in map by more recent without adding new one + updated[config.service] = config.serviceConfig + // stop ticker + ticker.Stop() + // restart + // logic is following: + // if we getting a lot of events, we should't restart particular service on each of it (user doing bug move or very fast typing) + // instead, we are resetting the ticker and wait for Interval time + // If there is no more events, we restart service only once + ticker = time.NewTicker(s.cfg.Interval) + case <-ticker.C: + if len(updated) > 0 { + for k, v := range updated { + sv := *v.service + err := sv.Server().Reset() + if err != nil { + s.log.Error(err) + } + s.log.Debugf("[%s] found %v file(s) changes, reloading", k, len(updated)) + } + // zero map + updated = make(map[string]ServiceConfig, 100) + } + case <-s.stopc: + ticker.Stop() + return + } + } + }() + + err := s.watcher.StartPolling(s.cfg.Interval) + if err != nil { + return err + } + + return nil +} + +func (s *Service) Stop() { + s.watcher.Stop() + s.stopc <- struct{}{} +} diff --git a/service/reload/service_test.go b/service/reload/service_test.go new file mode 100644 index 00000000..7cad4a5d --- /dev/null +++ b/service/reload/service_test.go @@ -0,0 +1 @@ +package reload diff --git a/service/reload/watcher.go b/service/reload/watcher.go new file mode 100644 index 00000000..027d2d0d --- /dev/null +++ b/service/reload/watcher.go @@ -0,0 +1,409 @@ +package reload + +import ( + "errors" + "io/ioutil" + "os" + "path/filepath" + "sync" + "time" +) + +var ErrorSkip = errors.New("file is skipped") +var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true") + +// SimpleHook is used to filter by simple criteria, CONTAINS +type SimpleHook func(filename string, pattern []string) error + +// An Event describes an event that is received when files or directory +// changes occur. It includes the os.FileInfo of the changed file or +// directory and the type of event that's occurred and the full path of the file. +type Event struct { + path string + info os.FileInfo + + service string // type of service, http, grpc, etc... +} + +type WatcherConfig struct { + // service name + serviceName string + + // recursive or just add by singe directory + recursive bool + + // directories used per-service + directories []string + + // simple hook, just CONTAINS + filterHooks func(filename string, pattern []string) error + + // path to file with files + files map[string]os.FileInfo + + // ignored directories, used map for O(1) amortized get + ignored map[string]struct{} + + // filePatterns to ignore + filePatterns []string +} + +type Watcher struct { + // main event channel + Event chan Event + close chan struct{} + + //============================= + mu *sync.Mutex + + // indicates is walker started or not + started bool + + // config for each service + // need pointer here to assign files + watcherConfigs map[string]WatcherConfig +} + +// Options is used to set Watcher Options +type Options func(*Watcher) + +// NewWatcher returns new instance of File Watcher +func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) { + w := &Watcher{ + Event: make(chan Event), + mu: &sync.Mutex{}, + + close: make(chan struct{}), + + //workingDir: workDir, + watcherConfigs: make(map[string]WatcherConfig), + } + + // add watcherConfigs by service names + for _, v := range configs { + w.watcherConfigs[v.serviceName] = v + } + + // apply options + for _, option := range options { + option(w) + } + err := w.initFs() + if err != nil { + return nil, err + } + + return w, nil +} + +// initFs makes initial map with files +func (w *Watcher) initFs() error { + for srvName, config := range w.watcherConfigs { + fileList, err := w.retrieveFileList(srvName, config) + if err != nil { + return err + } + // workaround. in golang you can't assign to map in struct field + tmp := w.watcherConfigs[srvName] + tmp.files = fileList + w.watcherConfigs[srvName] = tmp + } + return nil +} + +// ConvertIgnored is used to convert slice to map with ignored files +func ConvertIgnored(ignored []string) (map[string]struct{}, error) { + if len(ignored) == 0 { + return nil, nil + } + + ign := make(map[string]struct{}, len(ignored)) + for i := 0; i < len(ignored); i++ { + abs, err := filepath.Abs(ignored[i]) + if err != nil { + return nil, err + } + ign[abs] = struct{}{} + } + + return ign, nil + +} + +// GetAllFiles returns all files initialized for particular company +func (w *Watcher) GetAllFiles(serviceName string) []os.FileInfo { + var ret []os.FileInfo + + for _, v := range w.watcherConfigs[serviceName].files { + ret = append(ret, v) + } + + return ret +} + +// https://en.wikipedia.org/wiki/Inotify +// SetMaxFileEvents sets max file notify events for Watcher +// In case of file watch errors, this value can be increased system-wide +// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf) +// Add apply: sudo sysctl -p --system +//func SetMaxFileEvents(events int) Options { +// return func(watcher *Watcher) { +// watcher.maxFileWatchEvents = events +// } +// +//} + +// pass map from outside +func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) { + stat, err := os.Stat(path) + if err != nil { + return nil, err + } + + filesList := make(map[string]os.FileInfo, 10) + filesList[path] = stat + + // if it's not a dir, return + if !stat.IsDir() { + return filesList, nil + } + + fileInfoList, err := ioutil.ReadDir(path) + if err != nil { + return nil, err + } + + // recursive calls are slow in compare to goto + // so, we will add files with goto pattern +outer: + for i := 0; i < len(fileInfoList); i++ { + var pathToFile string + // BCE check elimination + // https://go101.org/article/bounds-check-elimination.html + if len(fileInfoList) != 0 && len(fileInfoList) >= i { + pathToFile = filepath.Join(pathToFile, fileInfoList[i].Name()) + } else { + return nil, errors.New("file info list len") + } + + // if file in ignored --> continue + if _, ignored := w.watcherConfigs[serviceName].ignored[path]; ignored { + continue + } + + // if filename does not contain pattern --> ignore that file + if w.watcherConfigs[serviceName].filePatterns != nil && w.watcherConfigs[serviceName].filterHooks != nil { + err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { + continue outer + } + } + + filesList[pathToFile] = fileInfoList[i] + } + + return filesList, nil +} + +func (w *Watcher) StartPolling(duration time.Duration) error { + w.mu.Lock() + if w.started { + w.mu.Unlock() + return errors.New("already started") + } + + w.started = true + w.mu.Unlock() + + return w.waitEvent(duration) +} + +// this is blocking operation +func (w *Watcher) waitEvent(d time.Duration) error { + ticker := time.NewTicker(d) + for { + select { + case <-w.close: + ticker.Stop() + // just exit + // no matter for the pollEvents + return nil + case <-ticker.C: + // this is not very effective way + // because we have to wait on Lock + // better is to listen files in parallel, but, since that would be used in debug... TODO + for serviceName, config := range w.watcherConfigs { + go func(sn string, c WatcherConfig) { + fileList, _ := w.retrieveFileList(sn, c) + w.pollEvents(c.serviceName, fileList) + }(serviceName, config) + } + } + } + +} + +// retrieveFileList get file list for service +func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) { + w.mu.Lock() + defer w.mu.Unlock() + fileList := make(map[string]os.FileInfo) + if config.recursive { + // walk through directories recursively + for _, dir := range config.directories { + // full path is workdir/relative_path + fullPath, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + list, err := w.retrieveFilesRecursive(serviceName, fullPath) + if err != nil { + return nil, err + } + + for k, v := range list { + fileList[k] = v + } + } + return fileList, nil + } + + for _, dir := range config.directories { + // full path is workdir/relative_path + fullPath, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + + // list is pathToFiles with files + list, err := w.retrieveFilesSingle(serviceName, fullPath) + if err != nil { + return nil, err + } + + for pathToFile, file := range list { + fileList[pathToFile] = file + } + } + + return fileList, nil +} + +func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) { + fileList := make(map[string]os.FileInfo) + + return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // If path is ignored and it's a directory, skip the directory. If it's + // ignored and it's a single file, skip the file. + _, ignored := w.watcherConfigs[serviceName].ignored[path] + if ignored { + if info.IsDir() { + // if it's dir, ignore whole + return filepath.SkipDir + } + return nil + } + + // if filename does not contain pattern --> ignore that file + err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns) + if err == ErrorSkip { + return nil + } + + // Add the path and it's info to the file list. + fileList[path] = info + return nil + }) +} + +func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { + w.mu.Lock() + defer w.mu.Unlock() + + // Store create and remove events for use to check for rename events. + creates := make(map[string]os.FileInfo) + removes := make(map[string]os.FileInfo) + + // Check for removed files. + for pth, info := range w.watcherConfigs[serviceName].files { + if _, found := files[pth]; !found { + removes[pth] = info + } + } + + // Check for created files, writes and chmods. + for pth, info := range files { + if info.IsDir() { + continue + } + oldInfo, found := w.watcherConfigs[serviceName].files[pth] + if !found { + // A file was created. + creates[pth] = info + continue + } + if oldInfo.ModTime() != info.ModTime() { + w.watcherConfigs[serviceName].files[pth] = info + w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + } + } + if oldInfo.Mode() != info.Mode() { + w.watcherConfigs[serviceName].files[pth] = info + w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + } + } + } + + //Check for renames and moves. + for path1, info1 := range removes { + for path2, info2 := range creates { + if sameFile(info1, info2) { + e := Event{ + path: path2, + info: info2, + service: serviceName, + } + + // remove initial path + delete(w.watcherConfigs[serviceName].files, path1) + // update with new + w.watcherConfigs[serviceName].files[path2] = info2 + + + w.Event <- e + } + } + } + + //Send all the remaining create and remove events. + for pth, info := range creates { + w.watcherConfigs[serviceName].files[pth] = info + w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + } + } + for pth, info := range removes { + delete(w.watcherConfigs[serviceName].files, pth) + w.Event <- Event{ + path: pth, + info: info, + service: serviceName, + } + } +} + +func (w *Watcher) Stop() { + w.close <- struct{}{} +} diff --git a/service/reload/watcher_test.go b/service/reload/watcher_test.go new file mode 100644 index 00000000..449a21df --- /dev/null +++ b/service/reload/watcher_test.go @@ -0,0 +1,437 @@ +package reload + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +var testServiceName = "test" + +// scenario +// Create walker instance, init with default config, check that Watcher found all files from config +func Test_Correct_Watcher_Init(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: nil, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: nil, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + if len(w.GetAllFiles(testServiceName)) != 2 { + t.Fatal("incorrect directories len") + } +} + +// scenario +// create 3 files, create walker instance +// Start poll events +// change file and see, if event had come to handler +func Test_Get_FileEvent(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: nil, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: nil, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + // should be 3 files and directory + if len(w.GetAllFiles(testServiceName)) != 4 { + t.Fatal("incorrect directories len") + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + if e.path != "file2.txt" { + panic("didn't handle event when write file2") + } + w.Stop() + } + }() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +// scenario +// create 3 files with different extensions, create walker instance +// Start poll events +// change file with txt extension, and see, if event had not come to handler because it was filtered +func Test_FileExtensionFilter(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: false, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 2 files (one filtered) and directory + if dirLen != 3 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + fmt.Println(e.info.Name()) + panic("handled event from filtered file") + } + }() + + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + w.Stop() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +// nested +// scenario +// create dir and nested dir +// make files with aaa, bbb and txt extensions, filter txt +// change not filtered file, handle event +func Test_Recursive_Support(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + nestedDir, err := ioutil.TempDir(tempDir, "/nested") + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 3 files (2 from root dir, and 1 from nested), filtered txt + if dirLen != 3 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + // change file in nested directory + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for e := range w.Event { + if e.info.Name() != "file4.aaa" { + panic("wrong handled event from watcher in nested dir") + } + w.Stop() + } + }() + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +func Test_Wrong_Dir(t *testing.T) { + // no such file or directory + wrongDir := "askdjfhaksdlfksdf" + + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{wrongDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: nil, + filePatterns: []string{"aaa", "bbb"}, + } + + _, err := NewWatcher([]WatcherConfig{wc}) + if err == nil { + t.Fatal(err) + } +} + +func Test_Filter_Directory(t *testing.T) { + tempDir, err := ioutil.TempDir(".", "") + defer func() { + err = freeResources(tempDir) + if err != nil { + t.Fatal(err) + } + }() + + nestedDir, err := ioutil.TempDir(tempDir, "/nested") + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{}, 0755) + if err != nil { + t.Fatal(err) + } + + ignored, err := ConvertIgnored([]string{nestedDir}) + if err != nil { + t.Fatal(err) + } + wc := WatcherConfig{ + serviceName: testServiceName, + recursive: true, + directories: []string{tempDir}, + filterHooks: func(filename string, patterns []string) error { + for i := 0; i < len(patterns); i++ { + if strings.Contains(filename, patterns[i]) { + return nil + } + } + return ErrorSkip + }, + files: make(map[string]os.FileInfo), + ignored: ignored, + filePatterns: []string{"aaa", "bbb", "txt"}, + } + + w, err := NewWatcher([]WatcherConfig{wc}) + if err != nil { + t.Fatal(err) + } + + dirLen := len(w.GetAllFiles(testServiceName)) + // should be 2 files (2 from root dir), filtered other + if dirLen != 2 { + t.Fatalf("incorrect directories len, len is: %d", dirLen) + } + + go func() { + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + // change file in nested directory + err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), + []byte{1, 1, 1}, 0755) + if err != nil { + panic(err) + } + go func() { + for range w.Event { + panic("handled event from watcher in nested dir") + } + }() + + // time sleep is used here because StartPolling is blocking operation + time.Sleep(time.Second * 5) + w.Stop() + + }() + + err = w.StartPolling(time.Second) + if err != nil { + t.Fatal(err) + } +} + +func freeResources(path string) error { + return os.RemoveAll(path) +} |