summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-16 14:13:07 +0300
committerValery Piashchynski <[email protected]>2020-12-16 14:13:07 +0300
commitaa55cf55d80d2dd21bf6063feb901a34051a1fcd (patch)
tree5af3fe84cb94d8bf4189f018883a83ce82937c1c /plugins
parent9e692dd9996f4d100ff3d9c5a358aa6894a748a3 (diff)
Reduce RANGE allocations, parallel checks for removed and created files
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/plugin.go5
-rw-r--r--plugins/reload/plugin.go22
-rw-r--r--plugins/reload/tests/configs/.rr-reload.yaml8
-rw-r--r--plugins/reload/tests/reload_plugin_test.go259
-rw-r--r--plugins/reload/tests/watcher_test.go698
-rw-r--r--plugins/reload/watcher.go107
6 files changed, 333 insertions, 766 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index a6399489..13299da1 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -296,7 +296,7 @@ func (s *Plugin) Reset() error {
s.Lock()
defer s.Unlock()
const op = errors.Op("http reset")
- s.log.Info("Resetting http plugin")
+ s.log.Info("HTTP plugin got restart request. Restarting...")
s.pool.Destroy(context.Background())
// re-read the config
@@ -317,6 +317,7 @@ func (s *Plugin) Reset() error {
return errors.E(op, err)
}
+ s.log.Info("HTTP workers Pool successfully restarted")
s.handler, err = NewHandler(
s.cfg.MaxRequestSize,
*s.cfg.Uploads,
@@ -329,7 +330,9 @@ func (s *Plugin) Reset() error {
// restore original listeners
s.pool.AddListener(s.listener)
+ s.log.Info("HTTP listeners successfully re-added")
+ s.log.Info("HTTP plugin successfully restarted")
return nil
}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index 386c5e39..555ddb82 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -13,6 +13,7 @@ import (
// PluginName contains default plugin name.
const PluginName string = "reload"
+const thresholdChanBuffer uint = 1000
type Plugin struct {
cfg *Config
@@ -86,10 +87,10 @@ func (s *Plugin) Serve() chan error {
treshholdc := make(chan struct {
serviceConfig ServiceConfig
service string
- }, 100)
+ }, thresholdChanBuffer)
// use the same interval
- ticker := time.NewTicker(s.cfg.Interval)
+ timer := time.NewTimer(s.cfg.Interval)
go func() {
for e := range s.watcher.Event {
@@ -101,21 +102,22 @@ func (s *Plugin) Serve() chan error {
}()
// map with configs by services
- updated := make(map[string]ServiceConfig, 100)
+ updated := make(map[string]ServiceConfig, len(s.cfg.Services))
go func() {
for {
select {
case cfg := <-treshholdc:
+ // logic is following:
+ // restart
+ timer.Stop()
// replace previous value in map by more recent without adding new one
updated[cfg.service] = cfg.serviceConfig
- // restart
- // logic is following:
// if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing)
- // instead, we are resetting the ticker and wait for Interval time
+ // instead, we are resetting the timer and wait for s.cfg.Interval time
// If there is no more events, we restart service only once
- ticker.Reset(s.cfg.Interval)
- case <-ticker.C:
+ timer.Reset(s.cfg.Interval)
+ case <-timer.C:
if len(updated) > 0 {
for name := range updated {
err := s.res.ResetByName(name)
@@ -125,10 +127,10 @@ func (s *Plugin) Serve() chan error {
}
}
// zero map
- updated = make(map[string]ServiceConfig, 100)
+ updated = make(map[string]ServiceConfig, len(s.cfg.Services))
}
case <-s.stopc:
- ticker.Stop()
+ timer.Stop()
return
}
}
diff --git a/plugins/reload/tests/configs/.rr-reload.yaml b/plugins/reload/tests/configs/.rr-reload.yaml
index e1149cd2..52511aa1 100644
--- a/plugins/reload/tests/configs/.rr-reload.yaml
+++ b/plugins/reload/tests/configs/.rr-reload.yaml
@@ -1,5 +1,5 @@
server:
- command: php ../../../tests/http/client.php echo pipes
+ command: php ../../../tests/psr-worker-bench.php
user: ''
group: ''
env:
@@ -8,7 +8,7 @@ server:
relayTimeout: 20s
http:
debug: true
- address: '127.0.0.1:15395'
+ address: '127.0.0.1:22388'
maxRequestSize: 1024
middleware:
- ''
@@ -33,9 +33,9 @@ http:
reload:
interval: 1s
patterns:
- - .go
+ - .txt
services:
http:
dirs:
- - ''
+ - './unit_tests'
recursive: true
diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go
index 3d50030f..c49609f1 100644
--- a/plugins/reload/tests/reload_plugin_test.go
+++ b/plugins/reload/tests/reload_plugin_test.go
@@ -1,14 +1,23 @@
package tests
import (
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "net/http"
"os"
"os/signal"
+ "path/filepath"
+ "strconv"
"sync"
"syscall"
"testing"
"time"
+ "github.com/golang/mock/gomock"
"github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -18,6 +27,9 @@ import (
"github.com/stretchr/testify/assert"
)
+const testDir string = "unit_tests"
+const hugeNumberOfFiles uint = 5000
+
func TestReloadInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
assert.NoError(t, err)
@@ -27,9 +39,21 @@ func TestReloadInit(t *testing.T) {
Prefix: "rr",
}
+ err = os.Mkdir(testDir, 0755)
+ assert.NoError(t, err)
+ defer func() {
+ assert.NoError(t, freeResources(testDir))
+ }()
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("Resetting http plugin").Times(1)
+
err = cont.RegisterAll(
cfg,
- &logger.ZapLogger{},
+ mockLogger,
&server.Plugin{},
&httpPlugin.Plugin{},
&reload.Plugin{},
@@ -80,5 +104,238 @@ func TestReloadInit(t *testing.T) {
}
}()
+ t.Run("ReloadTestInit", reloadTestInit)
+
+ wg.Wait()
+}
+
+func reloadTestInit(t *testing.T) {
+ err := ioutil.WriteFile(filepath.Join(testDir, "file.txt"), //nolint:gosec
+ []byte{}, 0755)
+ assert.NoError(t, err)
+}
+
+func TestReloadHugeNumberOfFiles(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-reload.yaml",
+ Prefix: "rr",
+ }
+
+ // try to remove, skip error
+ assert.NoError(t, freeResources(testDir))
+ err = os.Mkdir(testDir, 0755)
+ assert.NoError(t, err)
+
+ defer func() {
+ assert.NoError(t, freeResources(testDir))
+ }()
+
+ // controller := gomock.NewController(t)
+ // mockLogger := mocks.NewMockLogger(controller)
+ //
+ // mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2)
+ // mockLogger.EXPECT().Info("Resetting http plugin").Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ // mockLogger,
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ &reload.Plugin{},
+ &resetter.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ tt := time.NewTimer(time.Second * 160)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ t.Run("ReloadTestHugeNumberOfFiles", reloadHugeNumberOfFiles)
+ ttt := time.Now()
+ t.Run("ReloadRandomlyChangeFile", randomlyChangeFile)
+ if time.Since(ttt).Seconds() > 140 {
+ t.Fatal("spend too much time on reloading")
+ }
+ t.Run("ReloadHTTPLiveAfterReset", reloadHTTPLiveAfterReset)
+
wg.Wait()
}
+
+func reloadHTTPLiveAfterReset(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://localhost:22388", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "hello world", string(b))
+
+ err = r.Body.Close()
+ assert.NoError(t, err)
+}
+
+func randomlyChangeFile(t *testing.T) {
+ // we know, that directory contains 5000 files (0-4999)
+ // let's try to randomly change it
+ for i := 0; i < 100; i++ {
+ // rand sleep
+ rSleep := rand.Int63n(1000) // nolint:gosec
+ time.Sleep(time.Millisecond * time.Duration(rSleep))
+ rNum := rand.Int63n(int64(hugeNumberOfFiles)) // nolint:gosec
+ err := ioutil.WriteFile(filepath.Join(testDir, "file_"+strconv.Itoa(int(rNum))+".txt"), []byte("Hello, Gophers!"), 0755) // nolint:gosec
+ assert.NoError(t, err)
+ }
+}
+
+func reloadHugeNumberOfFiles(t *testing.T) {
+ for i := uint(0); i < hugeNumberOfFiles; i++ {
+ assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt"))
+ }
+}
+
+func freeResources(path string) error {
+ return os.RemoveAll(path)
+}
+
+func makeFile(filename string) error {
+ return ioutil.WriteFile(filepath.Join(testDir, filename), []byte{}, 0755) //nolint:gosec
+}
+
+func copyDir(src string, dst string) error {
+ src = filepath.Clean(src)
+ dst = filepath.Clean(dst)
+
+ si, err := os.Stat(src)
+ if err != nil {
+ return err
+ }
+ if !si.IsDir() {
+ return errors.E(errors.Str("source is not a directory"))
+ }
+
+ _, err = os.Stat(dst)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+ if err == nil {
+ return errors.E(errors.Str("destination already exists"))
+ }
+
+ err = os.MkdirAll(dst, si.Mode())
+ if err != nil {
+ return err
+ }
+
+ entries, err := ioutil.ReadDir(src)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range entries {
+ srcPath := filepath.Join(src, entry.Name())
+ dstPath := filepath.Join(dst, entry.Name())
+
+ if entry.IsDir() {
+ err = copyDir(srcPath, dstPath)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Skip symlinks.
+ if entry.Mode()&os.ModeSymlink != 0 {
+ continue
+ }
+
+ err = copyFile(srcPath, dstPath)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func copyFile(src, dst string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return errors.E(err)
+ }
+ defer func() {
+ _ = in.Close()
+ }()
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return errors.E(err)
+ }
+ defer func() {
+ _ = out.Close()
+ }()
+
+ _, err = io.Copy(out, in)
+ if err != nil {
+ return errors.E(err)
+ }
+
+ err = out.Sync()
+ if err != nil {
+ return errors.E(err)
+ }
+
+ si, err := os.Stat(src)
+ if err != nil {
+ return errors.E(err)
+ }
+ err = os.Chmod(dst, si.Mode())
+ if err != nil {
+ return errors.E(err)
+ }
+ return nil
+}
diff --git a/plugins/reload/tests/watcher_test.go b/plugins/reload/tests/watcher_test.go
deleted file mode 100644
index f1e0d737..00000000
--- a/plugins/reload/tests/watcher_test.go
+++ /dev/null
@@ -1,698 +0,0 @@
-package tests
-
-import (
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "path/filepath"
- "runtime"
- "strings"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- logPlugin "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/reload"
- "go.uber.org/zap"
-)
-
-var testServiceName = "test"
-
-// scenario
-// Create walker instance, init with default config, check that Watcher found all files from config
-func Test_Correct_Watcher_Init(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- defer func() {
- err = freeResources(tempDir)
- if err != nil {
- t.Fatal(err)
- }
- }()
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(tempDir, "file.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: false,
- Directories: []string{tempDir},
- FilterHooks: nil,
- Files: make(map[string]os.FileInfo),
- Ignored: nil,
- FilePatterns: nil,
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- if len(w.GetAllFiles(testServiceName)) != 2 {
- t.Fatal("incorrect directories len")
- }
-}
-
-// scenario
-// create 3 files, create walker instance
-// Start poll events
-// change file and see, if event had come to handler
-func Test_Get_FileEvent(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- c := make(chan struct{})
- defer func(name string) {
- err = freeResources(name)
- if err != nil {
- c <- struct{}{}
- t.Fatal(err)
- }
- c <- struct{}{}
- }(tempDir)
-
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(tempDir, "file1.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: false,
- Directories: []string{tempDir},
- FilterHooks: nil,
- Files: make(map[string]os.FileInfo),
- Ignored: nil,
- FilePatterns: []string{"aaa", "txt"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- // should be 3 files and directory
- if len(w.GetAllFiles(testServiceName)) != 4 {
- t.Fatal("incorrect directories len")
- }
-
- go limitTime(time.Second*10, t.Name(), c)
-
- go func() {
- go func() {
- time.Sleep(time.Second)
- err2 := ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"), //nolint:gosec
- []byte{1, 1, 1}, 0755)
- if err2 != nil {
- panic(err2)
- }
- runtime.Goexit()
- }()
-
- go func() {
- for e := range w.Event {
- if e.Path != "file2.txt" {
- panic("didn't handle event when write file2")
- }
- w.Stop()
- }
- }()
- }()
-
- err = w.StartPolling(time.Second)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-// scenario
-// create 3 files with different extensions, create walker instance
-// Start poll events
-// change file with txt extension, and see, if event had not come to handler because it was filtered
-func Test_FileExtensionFilter(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- c := make(chan struct{})
- defer func(name string) {
- err = freeResources(name)
- if err != nil {
- c <- struct{}{}
- t.Fatal(err)
- }
- c <- struct{}{}
- }(tempDir)
-
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: false,
- Directories: []string{tempDir},
- FilterHooks: func(filename string, patterns []string) error {
- for i := 0; i < len(patterns); i++ {
- if strings.Contains(filename, patterns[i]) {
- return nil
- }
- }
- return errors.E(errors.Skip)
- },
- Files: make(map[string]os.FileInfo),
- Ignored: nil,
- FilePatterns: []string{"aaa", "bbb"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- dirLen := len(w.GetAllFiles(testServiceName))
- // should be 2 files (one filtered) and directory
- if dirLen != 3 {
- t.Fatalf("incorrect directories len, len is: %d", dirLen)
- }
-
- go limitTime(time.Second*5, t.Name(), c)
-
- go func() {
- go func() {
- err2 := ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"), //nolint:gosec
- []byte{1, 1, 1}, 0755)
- if err2 != nil {
- panic(err2)
- }
-
- runtime.Goexit()
- }()
-
- go func() {
- for e := range w.Event {
- fmt.Println(e.Info.Name())
- panic("handled event from filtered file")
- }
- }()
- w.Stop()
- runtime.Goexit()
- }()
-
- err = w.StartPolling(time.Second)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-// nested
-// scenario
-// create dir and nested dir
-// make files with aaa, bbb and txt extensions, filter txt
-// change not filtered file, handle event
-func Test_Recursive_Support(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- defer func() {
- err = freeResources(tempDir)
- if err != nil {
- t.Fatal(err)
- }
- }()
-
- nestedDir, err := ioutil.TempDir(tempDir, "nested")
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: true,
- Directories: []string{tempDir},
- FilterHooks: func(filename string, patterns []string) error {
- for i := 0; i < len(patterns); i++ {
- if strings.Contains(filename, patterns[i]) {
- return nil
- }
- }
- return errors.E(errors.Skip)
- },
- Files: make(map[string]os.FileInfo),
- Ignored: nil,
- FilePatterns: []string{"aaa", "bbb"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- dirLen := len(w.GetAllFiles(testServiceName))
- // should be 3 files (2 from root dir, and 1 from nested), filtered txt
- if dirLen != 3 {
- t.Fatalf("incorrect directories len, len is: %d", dirLen)
- }
-
- go func() {
- // time sleep is used here because StartPolling is blocking operation
- time.Sleep(time.Second * 5)
- // change file in nested directory
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec
- []byte{1, 1, 1}, 0755)
- if err != nil {
- panic(err)
- }
- go func() {
- for e := range w.Event {
- if e.Info.Name() != "file4.aaa" {
- panic("wrong handled event from watcher in nested dir")
- }
- w.Stop()
- }
- }()
- }()
-
- err = w.StartPolling(time.Second)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Wrong_Dir(t *testing.T) {
- // no such file or directory
- wrongDir := "askdjfhaksdlfksdf"
-
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: true,
- Directories: []string{wrongDir},
- FilterHooks: func(filename string, patterns []string) error {
- for i := 0; i < len(patterns); i++ {
- if strings.Contains(filename, patterns[i]) {
- return nil
- }
- }
- return errors.E(errors.Skip)
- },
- Files: make(map[string]os.FileInfo),
- Ignored: nil,
- FilePatterns: []string{"aaa", "bbb"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- _, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err == nil {
- t.Fatal(err)
- }
-}
-
-func Test_Filter_Directory(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- c := make(chan struct{})
- defer func(name string) {
- err = freeResources(name)
- if err != nil {
- c <- struct{}{}
- t.Fatal(err)
- }
- c <- struct{}{}
- }(tempDir)
-
- go limitTime(time.Second*10, t.Name(), c)
-
- nestedDir, err := ioutil.TempDir(tempDir, "nested")
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- ignored, err := reload.ConvertIgnored([]string{nestedDir})
- if err != nil {
- t.Fatal(err)
- }
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: true,
- Directories: []string{tempDir},
- FilterHooks: func(filename string, patterns []string) error {
- for i := 0; i < len(patterns); i++ {
- if strings.Contains(filename, patterns[i]) {
- return nil
- }
- }
- return errors.E(errors.Skip)
- },
- Files: make(map[string]os.FileInfo),
- Ignored: ignored,
- FilePatterns: []string{"aaa", "bbb", "txt"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- dirLen := len(w.GetAllFiles(testServiceName))
- // should be 2 files (2 from root dir), filtered other
- if dirLen != 2 {
- t.Fatalf("incorrect directories len, len is: %d", dirLen)
- }
-
- go func() {
- go func() {
- err2 := ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec
- []byte{1, 1, 1}, 0755)
- if err2 != nil {
- panic(err2)
- }
- }()
-
- go func() {
- for e := range w.Event {
- fmt.Println("file: " + e.Info.Name())
- panic("handled event from watcher in nested dir")
- }
- }()
-
- // time sleep is used here because StartPolling is blocking operation
- time.Sleep(time.Second * 5)
- w.Stop()
- }()
-
- err = w.StartPolling(time.Second)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-// copy files from nested dir to not ignored
-// should fire an event
-func Test_Copy_Directory(t *testing.T) {
- tempDir, err := ioutil.TempDir(".", "")
- c := make(chan struct{})
- defer func() {
- err = freeResources(tempDir)
- if err != nil {
- c <- struct{}{}
- t.Fatal(err)
- }
- c <- struct{}{}
- }()
-
- nestedDir, err := ioutil.TempDir(tempDir, "nested")
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
- err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"), //nolint:gosec
- []byte{}, 0755)
- if err != nil {
- t.Fatal(err)
- }
-
- ignored, err := reload.ConvertIgnored([]string{nestedDir})
- if err != nil {
- t.Fatal(err)
- }
-
- wc := reload.WatcherConfig{
- ServiceName: testServiceName,
- Recursive: true,
- Directories: []string{tempDir},
- FilterHooks: func(filename string, patterns []string) error {
- for i := 0; i < len(patterns); i++ {
- if strings.Contains(filename, patterns[i]) {
- return nil
- }
- }
- return errors.E(errors.Skip)
- },
- Files: make(map[string]os.FileInfo),
- Ignored: ignored,
- FilePatterns: []string{"aaa", "bbb", "txt"},
- }
-
- logger, _ := zap.NewDevelopment()
- lg := logPlugin.NewZapAdapter(logger)
-
- w, err := reload.NewWatcher([]reload.WatcherConfig{wc}, lg)
- if err != nil {
- t.Fatal(err)
- }
-
- dirLen := len(w.GetAllFiles(testServiceName))
- // should be 2 files (2 from root dir), filtered other
- if dirLen != 2 {
- t.Fatalf("incorrect directories len, len is: %d", dirLen)
- }
-
- go limitTime(time.Second*10, t.Name(), c)
-
- go func() {
- go func() {
- err2 := copyDir(nestedDir, filepath.Join(tempDir, "copyTo"))
- if err2 != nil {
- panic(err2)
- }
-
- // exit from current goroutine
- runtime.Goexit()
- }()
-
- go func() {
- for range w.Event {
- // here should be event, otherwise we won't stop
- w.Stop()
- }
- }()
- }()
-
- err = w.StartPolling(time.Second)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func limitTime(d time.Duration, name string, free chan struct{}) {
- go func() {
- ticket := time.NewTicker(d)
- for {
- select {
- case <-ticket.C:
- ticket.Stop()
- panic("timeout exceed, test: " + name)
- case <-free:
- ticket.Stop()
- return
- }
- }
- }()
-}
-
-func copyFile(src, dst string) error {
- in, err := os.Open(src)
- if err != nil {
- return err
- }
- defer func() {
- _ = in.Close()
- }()
-
- out, err := os.Create(dst)
- if err != nil {
- return err
- }
- defer func() {
- if e := out.Close(); e != nil {
- err = e
- }
- }()
-
- _, err = io.Copy(out, in)
- if err != nil {
- return err
- }
-
- err = out.Sync()
- if err != nil {
- return err
- }
-
- si, err := os.Stat(src)
- if err != nil {
- return err
- }
- err = os.Chmod(dst, si.Mode())
- if err != nil {
- return err
- }
- return nil
-}
-
-func copyDir(src string, dst string) error {
- src = filepath.Clean(src)
- dst = filepath.Clean(dst)
-
- si, err := os.Stat(src)
- if err != nil {
- return err
- }
- if !si.IsDir() {
- return fmt.Errorf("source is not a directory")
- }
-
- _, err = os.Stat(dst)
- if err != nil && !os.IsNotExist(err) {
- return err
- }
- if err == nil {
- return fmt.Errorf("destination already exists")
- }
-
- err = os.MkdirAll(dst, si.Mode())
- if err != nil {
- return err
- }
-
- entries, err := ioutil.ReadDir(src)
- if err != nil {
- return err
- }
-
- for _, entry := range entries {
- srcPath := filepath.Join(src, entry.Name())
- dstPath := filepath.Join(dst, entry.Name())
-
- if entry.IsDir() {
- err = copyDir(srcPath, dstPath)
- if err != nil {
- return err
- }
- } else {
- // Skip symlinks.
- if entry.Mode()&os.ModeSymlink != 0 {
- continue
- }
-
- err = copyFile(srcPath, dstPath)
- if err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-func freeResources(path string) error {
- return os.RemoveAll(path)
-}
diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go
index 1829839b..cf1b840c 100644
--- a/plugins/reload/watcher.go
+++ b/plugins/reload/watcher.go
@@ -227,11 +227,11 @@ func (w *Watcher) waitEvent(d time.Duration) error {
// this is not very effective way
// because we have to wait on Lock
// better is to listen files in parallel, but, since that would be used in debug... TODO
- for serviceName, config := range w.watcherConfigs {
+ for serviceName := range w.watcherConfigs {
go func(sn string, c WatcherConfig) {
fileList, _ := w.retrieveFileList(sn, c)
w.pollEvents(c.ServiceName, fileList)
- }(serviceName, config)
+ }(serviceName, w.watcherConfigs[serviceName])
}
}
}
@@ -244,9 +244,9 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma
fileList := make(map[string]os.FileInfo)
if config.Recursive {
// walk through directories recursively
- for _, dir := range config.Directories {
+ for i := 0; i < len(config.Directories); i++ {
// full path is workdir/relative_path
- fullPath, err := filepath.Abs(dir)
+ fullPath, err := filepath.Abs(config.Directories[i])
if err != nil {
return nil, err
}
@@ -255,16 +255,16 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma
return nil, err
}
- for k, v := range list {
- fileList[k] = v
+ for k := range list {
+ fileList[k] = list[k]
}
}
return fileList, nil
}
- for _, dir := range config.Directories {
+ for i := 0; i < len(config.Directories); i++ {
// full path is workdir/relative_path
- fullPath, err := filepath.Abs(dir)
+ fullPath, err := filepath.Abs(config.Directories[i])
if err != nil {
return nil, err
}
@@ -323,87 +323,90 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) {
removes := make(map[string]os.FileInfo)
// Check for removed files.
- for pth, info := range w.watcherConfigs[serviceName].Files {
+ for pth := range w.watcherConfigs[serviceName].Files {
if _, found := files[pth]; !found {
- removes[pth] = info
- w.log.Debug("file was removed", "path", pth, "name", info.Name(), "size", info.Size())
+ removes[pth] = w.watcherConfigs[serviceName].Files[pth]
+ w.log.Debug("file was removed", "path", pth, "name", w.watcherConfigs[serviceName].Files[pth].Name(), "size", w.watcherConfigs[serviceName].Files[pth].Size())
}
}
// Check for created files, writes and chmods.
- for pth, info := range files {
- if info.IsDir() {
+ for pth := range files {
+ if files[pth].IsDir() {
continue
}
oldInfo, found := w.watcherConfigs[serviceName].Files[pth]
if !found {
// A file was created.
- creates[pth] = info
- w.log.Debug("file was created", "path", pth, "name", info.Name(), "size", info.Size())
+ creates[pth] = files[pth]
+ w.log.Debug("file was created", "path", pth, "name", files[pth].Name(), "size", files[pth].Size())
continue
}
- if oldInfo.ModTime() != info.ModTime() {
- w.watcherConfigs[serviceName].Files[pth] = info
- w.log.Debug("file was updated", "path", pth, "name", info.Name(), "size", info.Size())
- w.Event <- Event{
- Path: pth,
- Info: info,
- service: serviceName,
- }
- }
- if oldInfo.Mode() != info.Mode() {
- w.watcherConfigs[serviceName].Files[pth] = info
- w.log.Debug("file was updated", "path", pth, "name", info.Name(), "size", info.Size())
+
+ if oldInfo.ModTime() != files[pth].ModTime() || oldInfo.Mode() != files[pth].Mode() {
+ w.watcherConfigs[serviceName].Files[pth] = files[pth]
+ w.log.Debug("file was updated", "path", pth, "name", files[pth].Name(), "size", files[pth].Size())
w.Event <- Event{
Path: pth,
- Info: info,
+ Info: files[pth],
service: serviceName,
}
}
}
// Check for renames and moves.
- for path1, info1 := range removes {
- for path2, info2 := range creates {
- if sameFile(info1, info2) {
+ for path1 := range removes {
+ for path2 := range creates {
+ if sameFile(removes[path1], creates[path2]) {
e := Event{
Path: path2,
- Info: info2,
+ Info: creates[path2],
service: serviceName,
}
// remove initial path
delete(w.watcherConfigs[serviceName].Files, path1)
// update with new
- w.watcherConfigs[serviceName].Files[path2] = info2
+ w.watcherConfigs[serviceName].Files[path2] = creates[path2]
- w.log.Debug("file was renamed/moved", "old path", path1, "new path", path2, "name", info2.Name(), "size", info2.Size())
+ w.log.Debug("file was renamed/moved", "old path", path1, "new path", path2, "name", creates[path2].Name(), "size", creates[path2].Size())
w.Event <- e
}
}
}
- // Send all the remaining create and remove events.
- for pth, info := range creates {
- w.watcherConfigs[serviceName].Files[pth] = info
- w.log.Debug("file was created", "path", pth, "name", info.Name(), "size", info.Size())
+ wg := &sync.WaitGroup{}
+ wg.Add(2)
+ go func() {
+ defer wg.Done()
+ // Send all the remaining create and remove events.
+ for pth := range creates {
+ w.watcherConfigs[serviceName].Files[pth] = creates[pth]
+ w.log.Debug("file was created", "path", pth, "name", creates[pth].Name(), "size", creates[pth].Size())
- w.Event <- Event{
- Path: pth,
- Info: info,
- service: serviceName,
+ w.Event <- Event{
+ Path: pth,
+ Info: creates[pth],
+ service: serviceName,
+ }
}
- }
- for pth, info := range removes {
- delete(w.watcherConfigs[serviceName].Files, pth)
- w.log.Debug("file was removed", "path", pth, "name", info.Name(), "size", info.Size())
-
- w.Event <- Event{
- Path: pth,
- Info: info,
- service: serviceName,
+ }()
+
+ go func() {
+ defer wg.Done()
+ for pth := range removes {
+ delete(w.watcherConfigs[serviceName].Files, pth)
+ w.log.Debug("file was removed", "path", pth, "name", removes[pth].Name(), "size", removes[pth].Size())
+
+ w.Event <- Event{
+ Path: pth,
+ Info: removes[pth],
+ service: serviceName,
+ }
}
- }
+ }()
+
+ wg.Wait()
}
func (w *Watcher) Stop() {