diff options
author | Valery Piashchynski <[email protected]> | 2020-12-16 14:13:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-16 14:13:07 +0300 |
commit | aa55cf55d80d2dd21bf6063feb901a34051a1fcd (patch) | |
tree | 5af3fe84cb94d8bf4189f018883a83ce82937c1c /plugins/reload | |
parent | 9e692dd9996f4d100ff3d9c5a358aa6894a748a3 (diff) |
Reduce RANGE allocations, parallel checks for removed and created files
Diffstat (limited to 'plugins/reload')
-rw-r--r-- | plugins/reload/plugin.go | 22 | ||||
-rw-r--r-- | plugins/reload/tests/configs/.rr-reload.yaml | 8 | ||||
-rw-r--r-- | plugins/reload/tests/reload_plugin_test.go | 259 | ||||
-rw-r--r-- | plugins/reload/tests/watcher_test.go | 698 | ||||
-rw-r--r-- | plugins/reload/watcher.go | 107 |
5 files changed, 329 insertions, 765 deletions
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go index 386c5e39..555ddb82 100644 --- a/plugins/reload/plugin.go +++ b/plugins/reload/plugin.go @@ -13,6 +13,7 @@ import ( // PluginName contains default plugin name. const PluginName string = "reload" +const thresholdChanBuffer uint = 1000 type Plugin struct { cfg *Config @@ -86,10 +87,10 @@ func (s *Plugin) Serve() chan error { treshholdc := make(chan struct { serviceConfig ServiceConfig service string - }, 100) + }, thresholdChanBuffer) // use the same interval - ticker := time.NewTicker(s.cfg.Interval) + timer := time.NewTimer(s.cfg.Interval) go func() { for e := range s.watcher.Event { @@ -101,21 +102,22 @@ func (s *Plugin) Serve() chan error { }() // map with configs by services - updated := make(map[string]ServiceConfig, 100) + updated := make(map[string]ServiceConfig, len(s.cfg.Services)) go func() { for { select { case cfg := <-treshholdc: + // logic is following: + // restart + timer.Stop() // replace previous value in map by more recent without adding new one updated[cfg.service] = cfg.serviceConfig - // restart - // logic is following: // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) - // instead, we are resetting the ticker and wait for Interval time + // instead, we are resetting the timer and wait for s.cfg.Interval time // If there is no more events, we restart service only once - ticker.Reset(s.cfg.Interval) - case <-ticker.C: + timer.Reset(s.cfg.Interval) + case <-timer.C: if len(updated) > 0 { for name := range updated { err := s.res.ResetByName(name) @@ -125,10 +127,10 @@ func (s *Plugin) Serve() chan error { } } // zero map - updated = make(map[string]ServiceConfig, 100) + updated = make(map[string]ServiceConfig, len(s.cfg.Services)) } case <-s.stopc: - ticker.Stop() + timer.Stop() return } } diff --git a/plugins/reload/tests/configs/.rr-reload.yaml b/plugins/reload/tests/configs/.rr-reload.yaml index e1149cd2..52511aa1 100644 --- a/plugins/reload/tests/configs/.rr-reload.yaml +++ b/plugins/reload/tests/configs/.rr-reload.yaml @@ -1,5 +1,5 @@ server: - command: php ../../../tests/http/client.php echo pipes + command: php ../../../tests/psr-worker-bench.php user: '' group: '' env: @@ -8,7 +8,7 @@ server: relayTimeout: 20s http: debug: true - address: '127.0.0.1:15395' + address: '127.0.0.1:22388' maxRequestSize: 1024 middleware: - '' @@ -33,9 +33,9 @@ http: reload: interval: 1s patterns: - - .go + - .txt services: http: dirs: - - '' + - './unit_tests' recursive: true diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go index 3d50030f..c49609f1 100644 --- a/plugins/reload/tests/reload_plugin_test.go +++ b/plugins/reload/tests/reload_plugin_test.go @@ -1,14 +1,23 @@ package tests import ( + "io" + "io/ioutil" + "math/rand" + "net/http" "os" "os/signal" + "path/filepath" + "strconv" "sync" "syscall" "testing" "time" + "github.com/golang/mock/gomock" "github.com/spiral/endure" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -18,6 +27,9 @@ import ( "github.com/stretchr/testify/assert" ) +const testDir string = "unit_tests" +const hugeNumberOfFiles uint = 5000 + func TestReloadInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) assert.NoError(t, err) @@ -27,9 +39,21 @@ func TestReloadInit(t *testing.T) { Prefix: "rr", } + err = os.Mkdir(testDir, 0755) + assert.NoError(t, err) + defer func() { + assert.NoError(t, freeResources(testDir)) + }() + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("Resetting http plugin").Times(1) + err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -80,5 +104,238 @@ func TestReloadInit(t *testing.T) { } }() + t.Run("ReloadTestInit", reloadTestInit) + + wg.Wait() +} + +func reloadTestInit(t *testing.T) { + err := ioutil.WriteFile(filepath.Join(testDir, "file.txt"), //nolint:gosec + []byte{}, 0755) + assert.NoError(t, err) +} + +func TestReloadHugeNumberOfFiles(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-reload.yaml", + Prefix: "rr", + } + + // try to remove, skip error + assert.NoError(t, freeResources(testDir)) + err = os.Mkdir(testDir, 0755) + assert.NoError(t, err) + + defer func() { + assert.NoError(t, freeResources(testDir)) + }() + + // controller := gomock.NewController(t) + // mockLogger := mocks.NewMockLogger(controller) + // + // mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + // mockLogger.EXPECT().Info("Resetting http plugin").Times(1) + + err = cont.RegisterAll( + cfg, + // mockLogger, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + &reload.Plugin{}, + &resetter.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + tt := time.NewTimer(time.Second * 160) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + t.Run("ReloadTestHugeNumberOfFiles", reloadHugeNumberOfFiles) + ttt := time.Now() + t.Run("ReloadRandomlyChangeFile", randomlyChangeFile) + if time.Since(ttt).Seconds() > 140 { + t.Fatal("spend too much time on reloading") + } + t.Run("ReloadHTTPLiveAfterReset", reloadHTTPLiveAfterReset) + wg.Wait() } + +func reloadHTTPLiveAfterReset(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost:22388", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "hello world", string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +func randomlyChangeFile(t *testing.T) { + // we know, that directory contains 5000 files (0-4999) + // let's try to randomly change it + for i := 0; i < 100; i++ { + // rand sleep + rSleep := rand.Int63n(1000) // nolint:gosec + time.Sleep(time.Millisecond * time.Duration(rSleep)) + rNum := rand.Int63n(int64(hugeNumberOfFiles)) // nolint:gosec + err := ioutil.WriteFile(filepath.Join(testDir, "file_"+strconv.Itoa(int(rNum))+".txt"), []byte("Hello, Gophers!"), 0755) // nolint:gosec + assert.NoError(t, err) + } +} + +func reloadHugeNumberOfFiles(t *testing.T) { + for i := uint(0); i < hugeNumberOfFiles; i++ { + assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) + } +} + +func freeResources(path string) error { + return os.RemoveAll(path) +} + +func makeFile(filename string) error { + return ioutil.WriteFile(filepath.Join(testDir, filename), []byte{}, 0755) //nolint:gosec +} + +func copyDir(src string, dst string) error { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return errors.E(errors.Str("source is not a directory")) + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return err + } + if err == nil { + return errors.E(errors.Str("destination already exists")) + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return err + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return err + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = copyDir(srcPath, dstPath) + if err != nil { + return err + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = copyFile(srcPath, dstPath) + if err != nil { + return err + } + } + } + return nil +} + +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return errors.E(err) + } + defer func() { + _ = in.Close() + }() + + out, err := os.Create(dst) + if err != nil { + return errors.E(err) + } + defer func() { + _ = out.Close() + }() + + _, err = io.Copy(out, in) + if err != nil { + return errors.E(err) + } + + err = out.Sync() + if err != nil { + return errors.E(err) + } + + si, err := os.Stat(src) + if err != nil { + return errors.E(err) + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return errors.E(err) + } + return nil +} diff --git a/plugins/reload/tests/watcher_test.go b/plugins/reload/tests/watcher_test.go deleted file mode 100644 index f1e0d737..00000000 --- a/plugins/reload/tests/watcher_test.go +++ /dev/null @@ -1,698 +0,0 @@ -package tests - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "runtime" - "strings" - "testing" - "time" - - "github.com/spiral/errors" - logPlugin "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/reload" - "go.uber.org/zap" -) - -var testServiceName = "test" - -// scenario -// Create walker instance, init with default config, check that Watcher found all files from config -func Test_Correct_Watcher_Init(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - defer func() { - err = freeResources(tempDir) - if err != nil { - t.Fatal(err) - } - }() - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(tempDir, "file.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: false, - Directories: []string{tempDir}, - FilterHooks: nil, - Files: make(map[string]os.FileInfo), - Ignored: nil, - FilePatterns: nil, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - if len(w.GetAllFiles(testServiceName)) != 2 { - t.Fatal("incorrect directories len") - } -} - -// scenario -// create 3 files, create walker instance -// Start poll events -// change file and see, if event had come to handler -func Test_Get_FileEvent(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - c := make(chan struct{}) - defer func(name string) { - err = freeResources(name) - if err != nil { - c <- struct{}{} - t.Fatal(err) - } - c <- struct{}{} - }(tempDir) - - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(tempDir, "file1.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: false, - Directories: []string{tempDir}, - FilterHooks: nil, - Files: make(map[string]os.FileInfo), - Ignored: nil, - FilePatterns: []string{"aaa", "txt"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - // should be 3 files and directory - if len(w.GetAllFiles(testServiceName)) != 4 { - t.Fatal("incorrect directories len") - } - - go limitTime(time.Second*10, t.Name(), c) - - go func() { - go func() { - time.Sleep(time.Second) - err2 := ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), //nolint:gosec - []byte{1, 1, 1}, 0755) - if err2 != nil { - panic(err2) - } - runtime.Goexit() - }() - - go func() { - for e := range w.Event { - if e.Path != "file2.txt" { - panic("didn't handle event when write file2") - } - w.Stop() - } - }() - }() - - err = w.StartPolling(time.Second) - if err != nil { - t.Fatal(err) - } -} - -// scenario -// create 3 files with different extensions, create walker instance -// Start poll events -// change file with txt extension, and see, if event had not come to handler because it was filtered -func Test_FileExtensionFilter(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - c := make(chan struct{}) - defer func(name string) { - err = freeResources(name) - if err != nil { - c <- struct{}{} - t.Fatal(err) - } - c <- struct{}{} - }(tempDir) - - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: false, - Directories: []string{tempDir}, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(errors.Skip) - }, - Files: make(map[string]os.FileInfo), - Ignored: nil, - FilePatterns: []string{"aaa", "bbb"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - dirLen := len(w.GetAllFiles(testServiceName)) - // should be 2 files (one filtered) and directory - if dirLen != 3 { - t.Fatalf("incorrect directories len, len is: %d", dirLen) - } - - go limitTime(time.Second*5, t.Name(), c) - - go func() { - go func() { - err2 := ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec - []byte{1, 1, 1}, 0755) - if err2 != nil { - panic(err2) - } - - runtime.Goexit() - }() - - go func() { - for e := range w.Event { - fmt.Println(e.Info.Name()) - panic("handled event from filtered file") - } - }() - w.Stop() - runtime.Goexit() - }() - - err = w.StartPolling(time.Second) - if err != nil { - t.Fatal(err) - } -} - -// nested -// scenario -// create dir and nested dir -// make files with aaa, bbb and txt extensions, filter txt -// change not filtered file, handle event -func Test_Recursive_Support(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - defer func() { - err = freeResources(tempDir) - if err != nil { - t.Fatal(err) - } - }() - - nestedDir, err := ioutil.TempDir(tempDir, "nested") - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: true, - Directories: []string{tempDir}, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(errors.Skip) - }, - Files: make(map[string]os.FileInfo), - Ignored: nil, - FilePatterns: []string{"aaa", "bbb"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - dirLen := len(w.GetAllFiles(testServiceName)) - // should be 3 files (2 from root dir, and 1 from nested), filtered txt - if dirLen != 3 { - t.Fatalf("incorrect directories len, len is: %d", dirLen) - } - - go func() { - // time sleep is used here because StartPolling is blocking operation - time.Sleep(time.Second * 5) - // change file in nested directory - err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec - []byte{1, 1, 1}, 0755) - if err != nil { - panic(err) - } - go func() { - for e := range w.Event { - if e.Info.Name() != "file4.aaa" { - panic("wrong handled event from watcher in nested dir") - } - w.Stop() - } - }() - }() - - err = w.StartPolling(time.Second) - if err != nil { - t.Fatal(err) - } -} - -func Test_Wrong_Dir(t *testing.T) { - // no such file or directory - wrongDir := "askdjfhaksdlfksdf" - - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: true, - Directories: []string{wrongDir}, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(errors.Skip) - }, - Files: make(map[string]os.FileInfo), - Ignored: nil, - FilePatterns: []string{"aaa", "bbb"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - _, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err == nil { - t.Fatal(err) - } -} - -func Test_Filter_Directory(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - c := make(chan struct{}) - defer func(name string) { - err = freeResources(name) - if err != nil { - c <- struct{}{} - t.Fatal(err) - } - c <- struct{}{} - }(tempDir) - - go limitTime(time.Second*10, t.Name(), c) - - nestedDir, err := ioutil.TempDir(tempDir, "nested") - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - ignored, err := reload.ConvertIgnored([]string{nestedDir}) - if err != nil { - t.Fatal(err) - } - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: true, - Directories: []string{tempDir}, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(errors.Skip) - }, - Files: make(map[string]os.FileInfo), - Ignored: ignored, - FilePatterns: []string{"aaa", "bbb", "txt"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - dirLen := len(w.GetAllFiles(testServiceName)) - // should be 2 files (2 from root dir), filtered other - if dirLen != 2 { - t.Fatalf("incorrect directories len, len is: %d", dirLen) - } - - go func() { - go func() { - err2 := ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec - []byte{1, 1, 1}, 0755) - if err2 != nil { - panic(err2) - } - }() - - go func() { - for e := range w.Event { - fmt.Println("file: " + e.Info.Name()) - panic("handled event from watcher in nested dir") - } - }() - - // time sleep is used here because StartPolling is blocking operation - time.Sleep(time.Second * 5) - w.Stop() - }() - - err = w.StartPolling(time.Second) - if err != nil { - t.Fatal(err) - } -} - -// copy files from nested dir to not ignored -// should fire an event -func Test_Copy_Directory(t *testing.T) { - tempDir, err := ioutil.TempDir(".", "") - c := make(chan struct{}) - defer func() { - err = freeResources(tempDir) - if err != nil { - c <- struct{}{} - t.Fatal(err) - } - c <- struct{}{} - }() - - nestedDir, err := ioutil.TempDir(tempDir, "nested") - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec - []byte{}, 0755) - if err != nil { - t.Fatal(err) - } - - ignored, err := reload.ConvertIgnored([]string{nestedDir}) - if err != nil { - t.Fatal(err) - } - - wc := reload.WatcherConfig{ - ServiceName: testServiceName, - Recursive: true, - Directories: []string{tempDir}, - FilterHooks: func(filename string, patterns []string) error { - for i := 0; i < len(patterns); i++ { - if strings.Contains(filename, patterns[i]) { - return nil - } - } - return errors.E(errors.Skip) - }, - Files: make(map[string]os.FileInfo), - Ignored: ignored, - FilePatterns: []string{"aaa", "bbb", "txt"}, - } - - logger, _ := zap.NewDevelopment() - lg := logPlugin.NewZapAdapter(logger) - - w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg) - if err != nil { - t.Fatal(err) - } - - dirLen := len(w.GetAllFiles(testServiceName)) - // should be 2 files (2 from root dir), filtered other - if dirLen != 2 { - t.Fatalf("incorrect directories len, len is: %d", dirLen) - } - - go limitTime(time.Second*10, t.Name(), c) - - go func() { - go func() { - err2 := copyDir(nestedDir, filepath.Join(tempDir, "copyTo")) - if err2 != nil { - panic(err2) - } - - // exit from current goroutine - runtime.Goexit() - }() - - go func() { - for range w.Event { - // here should be event, otherwise we won't stop - w.Stop() - } - }() - }() - - err = w.StartPolling(time.Second) - if err != nil { - t.Fatal(err) - } -} - -func limitTime(d time.Duration, name string, free chan struct{}) { - go func() { - ticket := time.NewTicker(d) - for { - select { - case <-ticket.C: - ticket.Stop() - panic("timeout exceed, test: " + name) - case <-free: - ticket.Stop() - return - } - } - }() -} - -func copyFile(src, dst string) error { - in, err := os.Open(src) - if err != nil { - return err - } - defer func() { - _ = in.Close() - }() - - out, err := os.Create(dst) - if err != nil { - return err - } - defer func() { - if e := out.Close(); e != nil { - err = e - } - }() - - _, err = io.Copy(out, in) - if err != nil { - return err - } - - err = out.Sync() - if err != nil { - return err - } - - si, err := os.Stat(src) - if err != nil { - return err - } - err = os.Chmod(dst, si.Mode()) - if err != nil { - return err - } - return nil -} - -func copyDir(src string, dst string) error { - src = filepath.Clean(src) - dst = filepath.Clean(dst) - - si, err := os.Stat(src) - if err != nil { - return err - } - if !si.IsDir() { - return fmt.Errorf("source is not a directory") - } - - _, err = os.Stat(dst) - if err != nil && !os.IsNotExist(err) { - return err - } - if err == nil { - return fmt.Errorf("destination already exists") - } - - err = os.MkdirAll(dst, si.Mode()) - if err != nil { - return err - } - - entries, err := ioutil.ReadDir(src) - if err != nil { - return err - } - - for _, entry := range entries { - srcPath := filepath.Join(src, entry.Name()) - dstPath := filepath.Join(dst, entry.Name()) - - if entry.IsDir() { - err = copyDir(srcPath, dstPath) - if err != nil { - return err - } - } else { - // Skip symlinks. - if entry.Mode()&os.ModeSymlink != 0 { - continue - } - - err = copyFile(srcPath, dstPath) - if err != nil { - return err - } - } - } - return nil -} - -func freeResources(path string) error { - return os.RemoveAll(path) -} diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go index 1829839b..cf1b840c 100644 --- a/plugins/reload/watcher.go +++ b/plugins/reload/watcher.go @@ -227,11 +227,11 @@ func (w *Watcher) waitEvent(d time.Duration) error { // this is not very effective way // because we have to wait on Lock // better is to listen files in parallel, but, since that would be used in debug... TODO - for serviceName, config := range w.watcherConfigs { + for serviceName := range w.watcherConfigs { go func(sn string, c WatcherConfig) { fileList, _ := w.retrieveFileList(sn, c) w.pollEvents(c.ServiceName, fileList) - }(serviceName, config) + }(serviceName, w.watcherConfigs[serviceName]) } } } @@ -244,9 +244,9 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma fileList := make(map[string]os.FileInfo) if config.Recursive { // walk through directories recursively - for _, dir := range config.Directories { + for i := 0; i < len(config.Directories); i++ { // full path is workdir/relative_path - fullPath, err := filepath.Abs(dir) + fullPath, err := filepath.Abs(config.Directories[i]) if err != nil { return nil, err } @@ -255,16 +255,16 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma return nil, err } - for k, v := range list { - fileList[k] = v + for k := range list { + fileList[k] = list[k] } } return fileList, nil } - for _, dir := range config.Directories { + for i := 0; i < len(config.Directories); i++ { // full path is workdir/relative_path - fullPath, err := filepath.Abs(dir) + fullPath, err := filepath.Abs(config.Directories[i]) if err != nil { return nil, err } @@ -323,87 +323,90 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { removes := make(map[string]os.FileInfo) // Check for removed files. - for pth, info := range w.watcherConfigs[serviceName].Files { + for pth := range w.watcherConfigs[serviceName].Files { if _, found := files[pth]; !found { - removes[pth] = info - w.log.Debug("file was removed", "path", pth, "name", info.Name(), "size", info.Size()) + removes[pth] = w.watcherConfigs[serviceName].Files[pth] + w.log.Debug("file was removed", "path", pth, "name", w.watcherConfigs[serviceName].Files[pth].Name(), "size", w.watcherConfigs[serviceName].Files[pth].Size()) } } // Check for created files, writes and chmods. - for pth, info := range files { - if info.IsDir() { + for pth := range files { + if files[pth].IsDir() { continue } oldInfo, found := w.watcherConfigs[serviceName].Files[pth] if !found { // A file was created. - creates[pth] = info - w.log.Debug("file was created", "path", pth, "name", info.Name(), "size", info.Size()) + creates[pth] = files[pth] + w.log.Debug("file was created", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) continue } - if oldInfo.ModTime() != info.ModTime() { - w.watcherConfigs[serviceName].Files[pth] = info - w.log.Debug("file was updated", "path", pth, "name", info.Name(), "size", info.Size()) - w.Event <- Event{ - Path: pth, - Info: info, - service: serviceName, - } - } - if oldInfo.Mode() != info.Mode() { - w.watcherConfigs[serviceName].Files[pth] = info - w.log.Debug("file was updated", "path", pth, "name", info.Name(), "size", info.Size()) + + if oldInfo.ModTime() != files[pth].ModTime() || oldInfo.Mode() != files[pth].Mode() { + w.watcherConfigs[serviceName].Files[pth] = files[pth] + w.log.Debug("file was updated", "path", pth, "name", files[pth].Name(), "size", files[pth].Size()) w.Event <- Event{ Path: pth, - Info: info, + Info: files[pth], service: serviceName, } } } // Check for renames and moves. - for path1, info1 := range removes { - for path2, info2 := range creates { - if sameFile(info1, info2) { + for path1 := range removes { + for path2 := range creates { + if sameFile(removes[path1], creates[path2]) { e := Event{ Path: path2, - Info: info2, + Info: creates[path2], service: serviceName, } // remove initial path delete(w.watcherConfigs[serviceName].Files, path1) // update with new - w.watcherConfigs[serviceName].Files[path2] = info2 + w.watcherConfigs[serviceName].Files[path2] = creates[path2] - w.log.Debug("file was renamed/moved", "old path", path1, "new path", path2, "name", info2.Name(), "size", info2.Size()) + w.log.Debug("file was renamed/moved", "old path", path1, "new path", path2, "name", creates[path2].Name(), "size", creates[path2].Size()) w.Event <- e } } } - // Send all the remaining create and remove events. - for pth, info := range creates { - w.watcherConfigs[serviceName].Files[pth] = info - w.log.Debug("file was created", "path", pth, "name", info.Name(), "size", info.Size()) + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + // Send all the remaining create and remove events. + for pth := range creates { + w.watcherConfigs[serviceName].Files[pth] = creates[pth] + w.log.Debug("file was created", "path", pth, "name", creates[pth].Name(), "size", creates[pth].Size()) - w.Event <- Event{ - Path: pth, - Info: info, - service: serviceName, + w.Event <- Event{ + Path: pth, + Info: creates[pth], + service: serviceName, + } } - } - for pth, info := range removes { - delete(w.watcherConfigs[serviceName].Files, pth) - w.log.Debug("file was removed", "path", pth, "name", info.Name(), "size", info.Size()) - - w.Event <- Event{ - Path: pth, - Info: info, - service: serviceName, + }() + + go func() { + defer wg.Done() + for pth := range removes { + delete(w.watcherConfigs[serviceName].Files, pth) + w.log.Debug("file was removed", "path", pth, "name", removes[pth].Name(), "size", removes[pth].Size()) + + w.Event <- Event{ + Path: pth, + Info: removes[pth], + service: serviceName, + } } - } + }() + + wg.Wait() } func (w *Watcher) Stop() { |