summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Titov <[email protected]>2020-02-23 16:46:15 +0300
committerGitHub <[email protected]>2020-02-23 16:46:15 +0300
commit954fa52704af5e9d7745de759743e16b3b7b563c (patch)
treef80a4d4cf8d7725491aa8aa3939fdbd4d17c13e3
parent99b8de5cef068446b58899d76fa02cd286837c49 (diff)
parent74f35de85648d36b328d01656c3857c11217d3b0 (diff)
Merge pull request #253 from spiral/file_watcher_module
File watcher module
-rw-r--r--.rr.yaml4
-rw-r--r--Makefile1
-rwxr-xr-xbuild.sh2
-rw-r--r--cmd/rr/main.go2
-rw-r--r--controller.go6
-rw-r--r--go.mod2
-rw-r--r--server.go6
-rw-r--r--service/gzip/service_test.go71
-rw-r--r--service/limit/service.go8
-rw-r--r--service/reload/config.go66
-rw-r--r--service/reload/config_test.go55
-rw-r--r--service/reload/samefile.go9
-rw-r--r--service/reload/samefile_windows.go12
-rw-r--r--service/reload/service.go162
-rw-r--r--service/reload/service_test.go1
-rw-r--r--service/reload/watcher.go409
-rw-r--r--service/reload/watcher_test.go437
17 files changed, 1172 insertions, 81 deletions
diff --git a/.rr.yaml b/.rr.yaml
index ed9336af..fc05112c 100644
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -160,7 +160,7 @@ static:
health:
# http host to serve health requests.
address: localhost:2113
-
+
# reload can reset rr servers when files change
reload:
# refresh internval (default 1s)
@@ -176,4 +176,4 @@ reload:
dirs: [""]
# include sub directories
- recursive: true
+ recursive: true \ No newline at end of file
diff --git a/Makefile b/Makefile
index bcededc9..d7e85f90 100644
--- a/Makefile
+++ b/Makefile
@@ -22,6 +22,7 @@ test:
go test -v -race -cover ./service/metrics
go test -v -race -cover ./service/health
go test -v -race -cover ./service/gzip
+ go test -v -race -cover ./service/reload
lint:
go fmt ./...
golint ./... \ No newline at end of file
diff --git a/build.sh b/build.sh
index ba0a1716..ff704c9a 100755
--- a/build.sh
+++ b/build.sh
@@ -12,7 +12,7 @@ LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.BuildTime=$(date +%
LDFLAGS="$LDFLAGS -s"
build(){
- echo Packaging $1 Build
+ echo Packaging "$1" Build
bdir=roadrunner-${RR_VERSION}-$2-$3
rm -rf builds/"$bdir" && mkdir -p builds/"$bdir"
GOOS=$2 GOARCH=$3 ./build.sh
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index ef393426..54a1f060 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -33,6 +33,7 @@ import (
"github.com/spiral/roadrunner/service/http"
"github.com/spiral/roadrunner/service/limit"
"github.com/spiral/roadrunner/service/metrics"
+ "github.com/spiral/roadrunner/service/reload"
"github.com/spiral/roadrunner/service/rpc"
"github.com/spiral/roadrunner/service/static"
@@ -51,6 +52,7 @@ func main() {
rr.Container.Register(limit.ID, &limit.Service{})
rr.Container.Register(health.ID, &health.Service{})
rr.Container.Register(gzip.ID, &gzip.Service{})
+ rr.Container.Register(reload.ID, &reload.Service{})
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/controller.go b/controller.go
index bda7ad6b..020ea4dd 100644
--- a/controller.go
+++ b/controller.go
@@ -8,3 +8,9 @@ type Controller interface {
// Detach pool watching.
Detach()
}
+
+// Attacher defines the ability to attach rr controller.
+type Attacher interface {
+ // Attach attaches controller to the service.
+ Attach(c Controller)
+} \ No newline at end of file
diff --git a/go.mod b/go.mod
index 57ffd209..8513085e 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,8 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/olekukonko/tablewriter v0.0.4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.4.1
diff --git a/server.go b/server.go
index 8f8e9382..406bc0a0 100644
--- a/server.go
+++ b/server.go
@@ -23,6 +23,12 @@ const (
EventPoolDestruct
)
+// Controllable defines the ability to attach rr controller.
+type Controllable interface {
+ // Server represents RR server
+ Server() *Server
+}
+
// Server manages pool creation and swapping.
type Server struct {
// configures server, pool, cmd creation and factory.
diff --git a/service/gzip/service_test.go b/service/gzip/service_test.go
index 9801860f..858dbe56 100644
--- a/service/gzip/service_test.go
+++ b/service/gzip/service_test.go
@@ -31,24 +31,6 @@ func (cfg *testCfg) Unmarshal(out interface{}) error {
return json.Unmarshal([]byte(cfg.target), out)
}
-//func get(url string) (string, *http.Response, error) {
-// r, err := http.Get(url)
-// if err != nil {
-// return "", nil, err
-// }
-//
-// b, err := ioutil.ReadAll(r.Body)
-// if err != nil {
-// return "", nil, err
-// }
-//
-// err = r.Body.Close()
-// if err != nil {
-// return "", nil, err
-// }
-//
-// return string(b), r, err
-//}
func Test_Disabled(t *testing.T) {
logger, _ := test.NewNullLogger()
@@ -65,56 +47,3 @@ func Test_Disabled(t *testing.T) {
assert.NotNil(t, s)
assert.Equal(t, service.StatusInactive, st)
}
-
-// func Test_Files(t *testing.T) {
-// logger, _ := test.NewNullLogger()
-// logger.SetLevel(logrus.DebugLevel)
-
-// c := service.NewContainer(logger)
-// c.Register(rrhttp.ID, &rrhttp.Service{})
-// c.Register(ID, &Service{})
-
-// assert.NoError(t, c.Init(&testCfg{
-// gzip: `{"enable":true}`,
-// static: `{"enable":true, "dir":"../../tests", "forbid":[]}`,
-// httpCfg: `{
-// "enable": true,
-// "address": ":6029",
-// "maxRequestSize": 1024,
-// "uploads": {
-// "dir": ` + tmpDir() + `,
-// "forbid": []
-// },
-// "workers":{
-// "command": "php ../../tests/http/client.php pid pipes",
-// "relay": "pipes",
-// "pool": {
-// "numWorkers": 1,
-// "allocateTimeout": 10000000,
-// "destroyTimeout": 10000000
-// }
-// }
-// }`}))
-
-// go func() {
-// err := c.Serve()
-// if err != nil {
-// t.Errorf("serve error: %v", err)
-// }
-// }()
-// time.Sleep(time.Millisecond * 1000)
-// defer c.Stop()
-
-// b, _, _ := get("http://localhost:6029/sample.txt")
-// assert.Equal(t, "sample", b)
-// //header should not contain content-encoding:gzip because content-length < gziphandler.DefaultMinSize
-// // b, _, _ := get("http://localhost:6029/gzip-large-file.txt")
-// //header should contain content-encoding:gzip because content-length > gziphandler.DefaultMinSize
-// }
-
-//func tmpDir() string {
-// p := os.TempDir()
-// r, _ := json.Marshal(p)
-//
-// return string(r)
-//}
diff --git a/service/limit/service.go b/service/limit/service.go
index 6af571e2..c0b4139c 100644
--- a/service/limit/service.go
+++ b/service/limit/service.go
@@ -8,12 +8,6 @@ import (
// ID defines controller service name.
const ID = "limit"
-// controllable defines the ability to attach rr controller.
-type controllable interface {
- // Attach attaches controller to the service.
- Attach(c roadrunner.Controller)
-}
-
// Service to control the state of rr service inside other services.
type Service struct {
lsns []func(event int, ctx interface{})
@@ -24,7 +18,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
// mount Services to designated services
for id, watcher := range cfg.Controllers(s.throw) {
svc, _ := c.Get(id)
- if ctrl, ok := svc.(controllable); ok {
+ if ctrl, ok := svc.(roadrunner.Attacher); ok {
ctrl.Attach(watcher)
}
}
diff --git a/service/reload/config.go b/service/reload/config.go
new file mode 100644
index 00000000..f33b5081
--- /dev/null
+++ b/service/reload/config.go
@@ -0,0 +1,66 @@
+package reload
+
+import (
+ "errors"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Config is a Reload configuration point.
+type Config struct {
+ // Interval is a global refresh interval
+ Interval time.Duration
+
+ // Patterns is a global file patterns to watch. It will be applied to every directory in project
+ Patterns []string
+
+ // Services is set of services which would be reloaded in case of FS changes
+ Services map[string]ServiceConfig
+}
+
+type ServiceConfig struct {
+ // Enabled indicates that service must be watched, doest not required when any other option specified
+ Enabled bool
+
+ // Recursive is options to use nested files from root folder
+ Recursive bool
+
+ // Patterns is per-service specific files to watch
+ Patterns []string
+
+ // Dirs is per-service specific dirs which will be combined with Patterns
+ Dirs []string
+
+ // Ignore is set of files which would not be watched
+ Ignore []string
+
+ // service is a link to service to restart
+ service *roadrunner.Controllable
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// InitDefaults sets missing values to their default values.
+func (c *Config) InitDefaults() error {
+ c.Interval = time.Second
+ c.Patterns = []string{".php"}
+
+ return nil
+}
+
+// Valid validates the configuration.
+func (c *Config) Valid() error {
+ if c.Interval < time.Second {
+ return errors.New("too short interval")
+ }
+
+ return nil
+}
diff --git a/service/reload/config_test.go b/service/reload/config_test.go
new file mode 100644
index 00000000..c9c05a1e
--- /dev/null
+++ b/service/reload/config_test.go
@@ -0,0 +1,55 @@
+package reload
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func Test_Config_Valid(t *testing.T) {
+ services := make(map[string]ServiceConfig)
+ services["test"] = ServiceConfig{
+ Recursive: false,
+ Patterns: nil,
+ Dirs: nil,
+ Ignore: nil,
+ service: nil,
+ }
+
+ cfg := &Config{
+ Interval: time.Second,
+ Patterns: nil,
+ Services: services,
+ }
+ assert.NoError(t, cfg.Valid())
+}
+
+func Test_Fake_ServiceConfig(t *testing.T) {
+ services := make(map[string]ServiceConfig)
+ cfg := &Config{
+ Interval: time.Second,
+ Patterns: nil,
+ Services: services,
+ }
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Interval(t *testing.T) {
+ services := make(map[string]ServiceConfig)
+ cfg := &Config{
+ Interval: time.Millisecond,
+ Patterns: nil,
+ Services: services,
+ }
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_NoServiceConfig(t *testing.T) {
+ services := make(map[string]ServiceConfig)
+ cfg := &Config{
+ Interval: time.Millisecond,
+ Patterns: nil,
+ Services: services,
+ }
+ assert.Error(t, cfg.Valid())
+}
diff --git a/service/reload/samefile.go b/service/reload/samefile.go
new file mode 100644
index 00000000..80df0431
--- /dev/null
+++ b/service/reload/samefile.go
@@ -0,0 +1,9 @@
+// +build !windows
+
+package reload
+
+import "os"
+
+func sameFile(fi1, fi2 os.FileInfo) bool {
+ return os.SameFile(fi1, fi2)
+}
diff --git a/service/reload/samefile_windows.go b/service/reload/samefile_windows.go
new file mode 100644
index 00000000..5f70d327
--- /dev/null
+++ b/service/reload/samefile_windows.go
@@ -0,0 +1,12 @@
+// +build windows
+
+package reload
+
+import "os"
+
+func sameFile(fi1, fi2 os.FileInfo) bool {
+ return fi1.ModTime() == fi2.ModTime() &&
+ fi1.Size() == fi2.Size() &&
+ fi1.Mode() == fi2.Mode() &&
+ fi1.IsDir() == fi2.IsDir()
+}
diff --git a/service/reload/service.go b/service/reload/service.go
new file mode 100644
index 00000000..9c615e0b
--- /dev/null
+++ b/service/reload/service.go
@@ -0,0 +1,162 @@
+package reload
+
+import (
+ "errors"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "os"
+ "strings"
+ "time"
+)
+
+// ID contains default service name.
+const ID = "reload"
+
+type Service struct {
+ cfg *Config
+ log *logrus.Logger
+ watcher *Watcher
+ stopc chan struct{}
+}
+
+// Init controller service
+func (s *Service) Init(cfg *Config, log *logrus.Logger, c service.Container) (bool, error) {
+ if cfg == nil || len(cfg.Services) == 0 {
+ return false, nil
+ }
+
+ s.cfg = cfg
+ s.log = log
+ s.stopc = make(chan struct{})
+
+ var configs []WatcherConfig
+
+ // mount Services to designated services
+ for serviceName := range cfg.Services {
+ svc, _ := c.Get(serviceName)
+ if ctrl, ok := svc.(roadrunner.Controllable); ok {
+ tmp := cfg.Services[serviceName]
+ tmp.service = &ctrl
+ cfg.Services[serviceName] = tmp
+ }
+ }
+
+ for serviceName, config := range s.cfg.Services {
+ if cfg.Services[serviceName].service == nil {
+ continue
+ }
+ ignored, err := ConvertIgnored(config.Ignore)
+ if err != nil {
+ return false, err
+ }
+ configs = append(configs, WatcherConfig{
+ serviceName: serviceName,
+ recursive: config.Recursive,
+ directories: config.Dirs,
+ filterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return ErrorSkip
+ },
+ files: make(map[string]os.FileInfo),
+ ignored: ignored,
+ filePatterns: append(config.Patterns, cfg.Patterns...),
+ })
+ }
+
+ var err error
+ s.watcher, err = NewWatcher(configs)
+ if err != nil {
+ return false, err
+ }
+
+ return true, nil
+}
+
+func (s *Service) Serve() error {
+ if s.cfg.Interval < time.Second {
+ return errors.New("reload interval is too fast")
+ }
+
+ // make a map with unique services
+ // so, if we would have a 100 events from http service
+ // in map we would see only 1 key and it's config
+ treshholdc := make(chan struct {
+ serviceConfig ServiceConfig
+ service string
+ }, 100)
+
+ // use the same interval
+ ticker := time.NewTicker(s.cfg.Interval)
+
+ // drain channel in case of leaved messages
+ defer func() {
+ go func() {
+ for range treshholdc {
+
+ }
+ }()
+ }()
+
+ go func() {
+ for e := range s.watcher.Event {
+ treshholdc <- struct {
+ serviceConfig ServiceConfig
+ service string
+ }{serviceConfig: s.cfg.Services[e.service], service: e.service}
+ }
+ }()
+
+ // map with configs by services
+ updated := make(map[string]ServiceConfig, 100)
+
+ go func() {
+ for {
+ select {
+ case config := <-treshholdc:
+ // replace previous value in map by more recent without adding new one
+ updated[config.service] = config.serviceConfig
+ // stop ticker
+ ticker.Stop()
+ // restart
+ // logic is following:
+ // if we getting a lot of events, we should't restart particular service on each of it (user doing bug move or very fast typing)
+ // instead, we are resetting the ticker and wait for Interval time
+ // If there is no more events, we restart service only once
+ ticker = time.NewTicker(s.cfg.Interval)
+ case <-ticker.C:
+ if len(updated) > 0 {
+ for k, v := range updated {
+ sv := *v.service
+ err := sv.Server().Reset()
+ if err != nil {
+ s.log.Error(err)
+ }
+ s.log.Debugf("[%s] found %v file(s) changes, reloading", k, len(updated))
+ }
+ // zero map
+ updated = make(map[string]ServiceConfig, 100)
+ }
+ case <-s.stopc:
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+
+ err := s.watcher.StartPolling(s.cfg.Interval)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *Service) Stop() {
+ s.watcher.Stop()
+ s.stopc <- struct{}{}
+}
diff --git a/service/reload/service_test.go b/service/reload/service_test.go
new file mode 100644
index 00000000..7cad4a5d
--- /dev/null
+++ b/service/reload/service_test.go
@@ -0,0 +1 @@
+package reload
diff --git a/service/reload/watcher.go b/service/reload/watcher.go
new file mode 100644
index 00000000..027d2d0d
--- /dev/null
+++ b/service/reload/watcher.go
@@ -0,0 +1,409 @@
+package reload
+
+import (
+ "errors"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+var ErrorSkip = errors.New("file is skipped")
+var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true")
+
+// SimpleHook is used to filter by simple criteria, CONTAINS
+type SimpleHook func(filename string, pattern []string) error
+
+// An Event describes an event that is received when files or directory
+// changes occur. It includes the os.FileInfo of the changed file or
+// directory and the type of event that's occurred and the full path of the file.
+type Event struct {
+ path string
+ info os.FileInfo
+
+ service string // type of service, http, grpc, etc...
+}
+
+type WatcherConfig struct {
+ // service name
+ serviceName string
+
+ // recursive or just add by singe directory
+ recursive bool
+
+ // directories used per-service
+ directories []string
+
+ // simple hook, just CONTAINS
+ filterHooks func(filename string, pattern []string) error
+
+ // path to file with files
+ files map[string]os.FileInfo
+
+ // ignored directories, used map for O(1) amortized get
+ ignored map[string]struct{}
+
+ // filePatterns to ignore
+ filePatterns []string
+}
+
+type Watcher struct {
+ // main event channel
+ Event chan Event
+ close chan struct{}
+
+ //=============================
+ mu *sync.Mutex
+
+ // indicates is walker started or not
+ started bool
+
+ // config for each service
+ // need pointer here to assign files
+ watcherConfigs map[string]WatcherConfig
+}
+
+// Options is used to set Watcher Options
+type Options func(*Watcher)
+
+// NewWatcher returns new instance of File Watcher
+func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) {
+ w := &Watcher{
+ Event: make(chan Event),
+ mu: &sync.Mutex{},
+
+ close: make(chan struct{}),
+
+ //workingDir: workDir,
+ watcherConfigs: make(map[string]WatcherConfig),
+ }
+
+ // add watcherConfigs by service names
+ for _, v := range configs {
+ w.watcherConfigs[v.serviceName] = v
+ }
+
+ // apply options
+ for _, option := range options {
+ option(w)
+ }
+ err := w.initFs()
+ if err != nil {
+ return nil, err
+ }
+
+ return w, nil
+}
+
+// initFs makes initial map with files
+func (w *Watcher) initFs() error {
+ for srvName, config := range w.watcherConfigs {
+ fileList, err := w.retrieveFileList(srvName, config)
+ if err != nil {
+ return err
+ }
+ // workaround. in golang you can't assign to map in struct field
+ tmp := w.watcherConfigs[srvName]
+ tmp.files = fileList
+ w.watcherConfigs[srvName] = tmp
+ }
+ return nil
+}
+
+// ConvertIgnored is used to convert slice to map with ignored files
+func ConvertIgnored(ignored []string) (map[string]struct{}, error) {
+ if len(ignored) == 0 {
+ return nil, nil
+ }
+
+ ign := make(map[string]struct{}, len(ignored))
+ for i := 0; i < len(ignored); i++ {
+ abs, err := filepath.Abs(ignored[i])
+ if err != nil {
+ return nil, err
+ }
+ ign[abs] = struct{}{}
+ }
+
+ return ign, nil
+
+}
+
+// GetAllFiles returns all files initialized for particular company
+func (w *Watcher) GetAllFiles(serviceName string) []os.FileInfo {
+ var ret []os.FileInfo
+
+ for _, v := range w.watcherConfigs[serviceName].files {
+ ret = append(ret, v)
+ }
+
+ return ret
+}
+
+// https://en.wikipedia.org/wiki/Inotify
+// SetMaxFileEvents sets max file notify events for Watcher
+// In case of file watch errors, this value can be increased system-wide
+// For linux: set --> fs.inotify.max_user_watches = 600000 (under /etc/<choose_name_here>.conf)
+// Add apply: sudo sysctl -p --system
+//func SetMaxFileEvents(events int) Options {
+// return func(watcher *Watcher) {
+// watcher.maxFileWatchEvents = events
+// }
+//
+//}
+
+// pass map from outside
+func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.FileInfo, error) {
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+
+ filesList := make(map[string]os.FileInfo, 10)
+ filesList[path] = stat
+
+ // if it's not a dir, return
+ if !stat.IsDir() {
+ return filesList, nil
+ }
+
+ fileInfoList, err := ioutil.ReadDir(path)
+ if err != nil {
+ return nil, err
+ }
+
+ // recursive calls are slow in compare to goto
+ // so, we will add files with goto pattern
+outer:
+ for i := 0; i < len(fileInfoList); i++ {
+ var pathToFile string
+ // BCE check elimination
+ // https://go101.org/article/bounds-check-elimination.html
+ if len(fileInfoList) != 0 && len(fileInfoList) >= i {
+ pathToFile = filepath.Join(pathToFile, fileInfoList[i].Name())
+ } else {
+ return nil, errors.New("file info list len")
+ }
+
+ // if file in ignored --> continue
+ if _, ignored := w.watcherConfigs[serviceName].ignored[path]; ignored {
+ continue
+ }
+
+ // if filename does not contain pattern --> ignore that file
+ if w.watcherConfigs[serviceName].filePatterns != nil && w.watcherConfigs[serviceName].filterHooks != nil {
+ err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns)
+ if err == ErrorSkip {
+ continue outer
+ }
+ }
+
+ filesList[pathToFile] = fileInfoList[i]
+ }
+
+ return filesList, nil
+}
+
+func (w *Watcher) StartPolling(duration time.Duration) error {
+ w.mu.Lock()
+ if w.started {
+ w.mu.Unlock()
+ return errors.New("already started")
+ }
+
+ w.started = true
+ w.mu.Unlock()
+
+ return w.waitEvent(duration)
+}
+
+// this is blocking operation
+func (w *Watcher) waitEvent(d time.Duration) error {
+ ticker := time.NewTicker(d)
+ for {
+ select {
+ case <-w.close:
+ ticker.Stop()
+ // just exit
+ // no matter for the pollEvents
+ return nil
+ case <-ticker.C:
+ // this is not very effective way
+ // because we have to wait on Lock
+ // better is to listen files in parallel, but, since that would be used in debug... TODO
+ for serviceName, config := range w.watcherConfigs {
+ go func(sn string, c WatcherConfig) {
+ fileList, _ := w.retrieveFileList(sn, c)
+ w.pollEvents(c.serviceName, fileList)
+ }(serviceName, config)
+ }
+ }
+ }
+
+}
+
+// retrieveFileList get file list for service
+func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ fileList := make(map[string]os.FileInfo)
+ if config.recursive {
+ // walk through directories recursively
+ for _, dir := range config.directories {
+ // full path is workdir/relative_path
+ fullPath, err := filepath.Abs(dir)
+ if err != nil {
+ return nil, err
+ }
+ list, err := w.retrieveFilesRecursive(serviceName, fullPath)
+ if err != nil {
+ return nil, err
+ }
+
+ for k, v := range list {
+ fileList[k] = v
+ }
+ }
+ return fileList, nil
+ }
+
+ for _, dir := range config.directories {
+ // full path is workdir/relative_path
+ fullPath, err := filepath.Abs(dir)
+ if err != nil {
+ return nil, err
+ }
+
+ // list is pathToFiles with files
+ list, err := w.retrieveFilesSingle(serviceName, fullPath)
+ if err != nil {
+ return nil, err
+ }
+
+ for pathToFile, file := range list {
+ fileList[pathToFile] = file
+ }
+ }
+
+ return fileList, nil
+}
+
+func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) {
+ fileList := make(map[string]os.FileInfo)
+
+ return fileList, filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+
+ // If path is ignored and it's a directory, skip the directory. If it's
+ // ignored and it's a single file, skip the file.
+ _, ignored := w.watcherConfigs[serviceName].ignored[path]
+ if ignored {
+ if info.IsDir() {
+ // if it's dir, ignore whole
+ return filepath.SkipDir
+ }
+ return nil
+ }
+
+ // if filename does not contain pattern --> ignore that file
+ err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns)
+ if err == ErrorSkip {
+ return nil
+ }
+
+ // Add the path and it's info to the file list.
+ fileList[path] = info
+ return nil
+ })
+}
+
+func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Store create and remove events for use to check for rename events.
+ creates := make(map[string]os.FileInfo)
+ removes := make(map[string]os.FileInfo)
+
+ // Check for removed files.
+ for pth, info := range w.watcherConfigs[serviceName].files {
+ if _, found := files[pth]; !found {
+ removes[pth] = info
+ }
+ }
+
+ // Check for created files, writes and chmods.
+ for pth, info := range files {
+ if info.IsDir() {
+ continue
+ }
+ oldInfo, found := w.watcherConfigs[serviceName].files[pth]
+ if !found {
+ // A file was created.
+ creates[pth] = info
+ continue
+ }
+ if oldInfo.ModTime() != info.ModTime() {
+ w.watcherConfigs[serviceName].files[pth] = info
+ w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }
+ }
+ if oldInfo.Mode() != info.Mode() {
+ w.watcherConfigs[serviceName].files[pth] = info
+ w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }
+ }
+ }
+
+ //Check for renames and moves.
+ for path1, info1 := range removes {
+ for path2, info2 := range creates {
+ if sameFile(info1, info2) {
+ e := Event{
+ path: path2,
+ info: info2,
+ service: serviceName,
+ }
+
+ // remove initial path
+ delete(w.watcherConfigs[serviceName].files, path1)
+ // update with new
+ w.watcherConfigs[serviceName].files[path2] = info2
+
+
+ w.Event <- e
+ }
+ }
+ }
+
+ //Send all the remaining create and remove events.
+ for pth, info := range creates {
+ w.watcherConfigs[serviceName].files[pth] = info
+ w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }
+ }
+ for pth, info := range removes {
+ delete(w.watcherConfigs[serviceName].files, pth)
+ w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }
+ }
+}
+
+func (w *Watcher) Stop() {
+ w.close <- struct{}{}
+}
diff --git a/service/reload/watcher_test.go b/service/reload/watcher_test.go
new file mode 100644
index 00000000..449a21df
--- /dev/null
+++ b/service/reload/watcher_test.go
@@ -0,0 +1,437 @@
+package reload
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+)
+
+var testServiceName = "test"
+
+// scenario
+// Create walker instance, init with default config, check that Watcher found all files from config
+func Test_Correct_Watcher_Init(t *testing.T) {
+ tempDir, err := ioutil.TempDir(".", "")
+ defer func() {
+ err = freeResources(tempDir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: false,
+ directories: []string{tempDir},
+ filterHooks: nil,
+ files: make(map[string]os.FileInfo),
+ ignored: nil,
+ filePatterns: nil,
+ }
+
+ w, err := NewWatcher([]WatcherConfig{wc})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if len(w.GetAllFiles(testServiceName)) != 2 {
+ t.Fatal("incorrect directories len")
+ }
+}
+
+// scenario
+// create 3 files, create walker instance
+// Start poll events
+// change file and see, if event had come to handler
+func Test_Get_FileEvent(t *testing.T) {
+ tempDir, err := ioutil.TempDir(".", "")
+ defer func() {
+ err = freeResources(tempDir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file1.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: false,
+ directories: []string{tempDir},
+ filterHooks: nil,
+ files: make(map[string]os.FileInfo),
+ ignored: nil,
+ filePatterns: nil,
+ }
+
+ w, err := NewWatcher([]WatcherConfig{wc})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // should be 3 files and directory
+ if len(w.GetAllFiles(testServiceName)) != 4 {
+ t.Fatal("incorrect directories len")
+ }
+
+ go func() {
+ // time sleep is used here because StartPolling is blocking operation
+ time.Sleep(time.Second * 5)
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file2.txt"),
+ []byte{1, 1, 1}, 0755)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for e := range w.Event {
+ if e.path != "file2.txt" {
+ panic("didn't handle event when write file2")
+ }
+ w.Stop()
+ }
+ }()
+ }()
+
+ err = w.StartPolling(time.Second)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// scenario
+// create 3 files with different extensions, create walker instance
+// Start poll events
+// change file with txt extension, and see, if event had not come to handler because it was filtered
+func Test_FileExtensionFilter(t *testing.T) {
+ tempDir, err := ioutil.TempDir(".", "")
+ defer func() {
+ err = freeResources(tempDir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: false,
+ directories: []string{tempDir},
+ filterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return ErrorSkip
+ },
+ files: make(map[string]os.FileInfo),
+ ignored: nil,
+ filePatterns: []string{"aaa", "bbb"},
+ }
+
+ w, err := NewWatcher([]WatcherConfig{wc})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ dirLen := len(w.GetAllFiles(testServiceName))
+ // should be 2 files (one filtered) and directory
+ if dirLen != 3 {
+ t.Fatalf("incorrect directories len, len is: %d", dirLen)
+ }
+
+ go func() {
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file3.txt"),
+ []byte{1, 1, 1}, 0755)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for e := range w.Event {
+ fmt.Println(e.info.Name())
+ panic("handled event from filtered file")
+ }
+ }()
+
+ // time sleep is used here because StartPolling is blocking operation
+ time.Sleep(time.Second * 5)
+ w.Stop()
+ }()
+
+ err = w.StartPolling(time.Second)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// nested
+// scenario
+// create dir and nested dir
+// make files with aaa, bbb and txt extensions, filter txt
+// change not filtered file, handle event
+func Test_Recursive_Support(t *testing.T) {
+ tempDir, err := ioutil.TempDir(".", "")
+ defer func() {
+ err = freeResources(tempDir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ nestedDir, err := ioutil.TempDir(tempDir, "/nested")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: true,
+ directories: []string{tempDir},
+ filterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return ErrorSkip
+ },
+ files: make(map[string]os.FileInfo),
+ ignored: nil,
+ filePatterns: []string{"aaa", "bbb"},
+ }
+
+ w, err := NewWatcher([]WatcherConfig{wc})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ dirLen := len(w.GetAllFiles(testServiceName))
+ // should be 3 files (2 from root dir, and 1 from nested), filtered txt
+ if dirLen != 3 {
+ t.Fatalf("incorrect directories len, len is: %d", dirLen)
+ }
+
+ go func() {
+ // time sleep is used here because StartPolling is blocking operation
+ time.Sleep(time.Second * 5)
+ // change file in nested directory
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"),
+ []byte{1, 1, 1}, 0755)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for e := range w.Event {
+ if e.info.Name() != "file4.aaa" {
+ panic("wrong handled event from watcher in nested dir")
+ }
+ w.Stop()
+ }
+ }()
+ }()
+
+ err = w.StartPolling(time.Second)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Wrong_Dir(t *testing.T) {
+ // no such file or directory
+ wrongDir := "askdjfhaksdlfksdf"
+
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: true,
+ directories: []string{wrongDir},
+ filterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return ErrorSkip
+ },
+ files: make(map[string]os.FileInfo),
+ ignored: nil,
+ filePatterns: []string{"aaa", "bbb"},
+ }
+
+ _, err := NewWatcher([]WatcherConfig{wc})
+ if err == nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Filter_Directory(t *testing.T) {
+ tempDir, err := ioutil.TempDir(".", "")
+ defer func() {
+ err = freeResources(tempDir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ nestedDir, err := ioutil.TempDir(tempDir, "/nested")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file1.aaa"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(tempDir, "file2.bbb"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file3.txt"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"),
+ []byte{}, 0755)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ignored, err := ConvertIgnored([]string{nestedDir})
+ if err != nil {
+ t.Fatal(err)
+ }
+ wc := WatcherConfig{
+ serviceName: testServiceName,
+ recursive: true,
+ directories: []string{tempDir},
+ filterHooks: func(filename string, patterns []string) error {
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
+ }
+ return ErrorSkip
+ },
+ files: make(map[string]os.FileInfo),
+ ignored: ignored,
+ filePatterns: []string{"aaa", "bbb", "txt"},
+ }
+
+ w, err := NewWatcher([]WatcherConfig{wc})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ dirLen := len(w.GetAllFiles(testServiceName))
+ // should be 2 files (2 from root dir), filtered other
+ if dirLen != 2 {
+ t.Fatalf("incorrect directories len, len is: %d", dirLen)
+ }
+
+ go func() {
+ // time sleep is used here because StartPolling is blocking operation
+ time.Sleep(time.Second * 5)
+ // change file in nested directory
+ err = ioutil.WriteFile(filepath.Join(nestedDir, "file4.aaa"),
+ []byte{1, 1, 1}, 0755)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for range w.Event {
+ panic("handled event from watcher in nested dir")
+ }
+ }()
+
+ // time sleep is used here because StartPolling is blocking operation
+ time.Sleep(time.Second * 5)
+ w.Stop()
+
+ }()
+
+ err = w.StartPolling(time.Second)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func freeResources(path string) error {
+ return os.RemoveAll(path)
+}