summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-02-21 10:32:51 +0300
committerValery Piashchynski <[email protected]>2020-02-21 10:32:51 +0300
commite32bc78ec1fc32b81c0029bbfee14bb570057554 (patch)
tree3e6db28a75d285d23968edf24f32142612fe88a3 /service
parent2efcfeb89861ba981f980bb4503c31ca6c7a92e0 (diff)
Update service to support file patterns
Update watcher Gracefull stop
Diffstat (limited to 'service')
-rw-r--r--service/reload/config.go10
-rw-r--r--service/reload/service.go68
-rw-r--r--service/reload/watcher.go285
3 files changed, 136 insertions, 227 deletions
diff --git a/service/reload/config.go b/service/reload/config.go
index f684a227..af1c79eb 100644
--- a/service/reload/config.go
+++ b/service/reload/config.go
@@ -3,16 +3,17 @@ package reload
import (
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service"
+ "time"
)
// Config is a Reload configuration point.
type Config struct {
// Enable or disable Reload extension, default disable.
Enabled bool
-
- // Watch is general pattern of files to watch. It will be applied to every directory in project
- Watch []string
-
+ // Interval is a global refresh interval
+ Interval time.Duration
+ // Patterns is a global file patterns to watch. It will be applied to every directory in project
+ Patterns []string
// Services is set of services which would be reloaded in case of FS changes
Services map[string]ServiceConfig
}
@@ -26,7 +27,6 @@ type ServiceConfig struct {
Dirs []string
// Ignore is set of files which would not be watched
Ignore []string
-
// service is a link to service to restart
service *roadrunner.Controllable
}
diff --git a/service/reload/service.go b/service/reload/service.go
index 9b3ac2f0..16648007 100644
--- a/service/reload/service.go
+++ b/service/reload/service.go
@@ -1,6 +1,7 @@
package reload
import (
+ "errors"
"fmt"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service"
@@ -15,14 +16,19 @@ const ID = "reload"
type Service struct {
reloadConfig *Config
watcher *Watcher
-
}
// Init controller service
func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
s.reloadConfig = cfg
+ if !s.reloadConfig.Enabled {
+ return false, nil
+ }
+ wd, err := os.Getwd()
+ if err != nil {
+ return false, err
+ }
- var err error
var configs []WatcherConfig
// mount Services to designated services
@@ -35,8 +41,6 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
}
}
-
-
for serviceName, config := range s.reloadConfig.Services {
if cfg.Services[serviceName].service == nil {
continue
@@ -45,44 +49,26 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
serviceName: serviceName,
recursive: config.Recursive,
directories: config.Dirs,
- filterHooks: func(filename, pattern string) error {
- if strings.Contains(filename, pattern) {
- return ErrorSkip
+ filterHooks: func(filename string, patterns []string) error {
+
+ for i := 0; i < len(patterns); i++ {
+ if strings.Contains(filename, patterns[i]) {
+ return nil
+ }
}
- return nil
+ return ErrorSkip
},
- files: make(map[string]os.FileInfo),
- //ignored:
+ files: make(map[string]os.FileInfo),
+ ignored: ConvertIgnored(wd, config.Ignore),
+ filePatterns: config.Patterns,
})
}
- s.watcher, err = NewWatcher(configs)
+ s.watcher, err = NewWatcher(wd, configs)
if err != nil {
return false, err
}
- for serviceName, config := range s.reloadConfig.Services {
- svc, _ := c.Get(serviceName)
- if ctrl, ok := svc.(*roadrunner.Controllable); ok {
- (*ctrl).Server().Reset()
- }
-
- configs = append(configs, WatcherConfig{
- serviceName: serviceName,
- recursive: config.Recursive,
- directories: config.Dirs,
- filterHooks: func(filename, pattern string) error {
- if strings.Contains(filename, pattern) {
- return ErrorSkip
- }
- return nil
- },
- files: make(map[string]os.FileInfo),
- //ignored:
-
- })
- }
-
return true, nil
}
@@ -90,14 +76,13 @@ func (s *Service) Serve() error {
if !s.reloadConfig.Enabled {
return nil
}
-
go func() {
for {
select {
case e := <-s.watcher.Event:
- println(fmt.Sprintf("type is:%s, oldPath:%s, path:%s, name:%s", e.Type, e.OldPath, e.Path, e.FileInfo.Name()))
+ println(fmt.Sprintf("Service is:%s, path:%s, name:%s", e.service, e.path, e.info.Name()))
- srv := s.reloadConfig.Services[e.Type]
+ srv := s.reloadConfig.Services[e.service]
if srv.service != nil {
s := *srv.service
@@ -107,14 +92,18 @@ func (s *Service) Serve() error {
}
} else {
s.watcher.mu.Lock()
- delete(s.watcher.watcherConfigs, e.Type)
+ delete(s.watcher.watcherConfigs, e.service)
s.watcher.mu.Unlock()
}
}
}
}()
- err := s.watcher.StartPolling(time.Second * 2)
+ if s.reloadConfig.Interval < time.Second {
+ return errors.New("too fast")
+ }
+
+ err := s.watcher.StartPolling(s.reloadConfig.Interval)
if err != nil {
return err
}
@@ -123,6 +112,5 @@ func (s *Service) Serve() error {
}
func (s *Service) Stop() {
- //s.watcher.Stop()
-
+ s.watcher.Stop()
}
diff --git a/service/reload/watcher.go b/service/reload/watcher.go
index 8b1ee7b2..da8007a3 100644
--- a/service/reload/watcher.go
+++ b/service/reload/watcher.go
@@ -10,46 +10,20 @@ import (
"time"
)
-// An Op is a type that is used to describe what type
-// of event has occurred during the watching process.
-type Op uint32
-
-// Ops
-const (
- Create Op = iota
- Write
- Remove
- Rename
- Chmod
- Move
-)
-
-var ops = map[Op]string{
- Create: "CREATE",
- Write: "WRITE",
- Remove: "REMOVE",
- Rename: "RENAME",
- Chmod: "CHMOD",
- Move: "MOVE",
-}
-
var ErrorSkip = errors.New("file is skipped")
var NoWalkerConfig = errors.New("should add at least one walker config, when reload is set to true")
// SimpleHook is used to filter by simple criteria, CONTAINS
-type SimpleHook func(filename, pattern string) error
+type SimpleHook func(filename string, pattern []string) error
// An Event describes an event that is received when files or directory
// changes occur. It includes the os.FileInfo of the changed file or
// directory and the type of event that's occurred and the full path of the file.
type Event struct {
- Op
+ path string
+ info os.FileInfo
- Path string
- OldPath string
- os.FileInfo
-
- Type string // type of event, http, grpc, etc...
+ service string // type of service, http, grpc, etc...
}
type WatcherConfig struct {
@@ -60,20 +34,20 @@ type WatcherConfig struct {
// directories used per-service
directories []string
// simple hook, just CONTAINS
- filterHooks SimpleHook
+ filterHooks func(filename string, pattern []string) error
// path to file with files
files map[string]os.FileInfo
- // //ignored files or directories, used map for O(1) amortized get
+ // ignored directories, used map for O(1) amortized get
ignored map[string]string
+ // filePatterns to ignore
+ filePatterns []string
}
type Watcher struct {
// main event channel
Event chan Event
+ close chan struct{}
- errors chan error
- close chan struct{}
- Closed chan struct{}
//=============================
mu *sync.Mutex
wg *sync.WaitGroup
@@ -83,9 +57,6 @@ type Watcher struct {
// working directory, same for all
workingDir string
- // operation type
- operations map[Op]struct{} // Op filtering.
-
// config for each service
// need pointer here to assign files
watcherConfigs map[string]WatcherConfig
@@ -94,22 +65,15 @@ type Watcher struct {
// Options is used to set Watcher Options
type Options func(*Watcher)
-func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) {
- dir, err := os.Getwd()
- if err != nil {
- return nil, err
- }
-
+func NewWatcher(workDir string, configs []WatcherConfig, options ...Options) (*Watcher, error) {
w := &Watcher{
Event: make(chan Event),
mu: &sync.Mutex{},
wg: &sync.WaitGroup{},
- Closed: make(chan struct{}),
- close: make(chan struct{}),
+ close: make(chan struct{}),
- workingDir: dir,
- operations: make(map[Op]struct{}),
+ workingDir: workDir,
watcherConfigs: make(map[string]WatcherConfig),
}
@@ -126,7 +90,7 @@ func NewWatcher(configs []WatcherConfig, options ...Options) (*Watcher, error) {
return nil, NoWalkerConfig
}
- err = w.initFs()
+ err := w.initFs()
if err != nil {
return nil, err
}
@@ -140,6 +104,7 @@ func (w *Watcher) initFs() error {
if err != nil {
return err
}
+ // workaround. in golang you can't assign to map in struct field
tmp := w.watcherConfigs[srvName]
tmp.files = fileList
w.watcherConfigs[srvName] = tmp
@@ -147,6 +112,21 @@ func (w *Watcher) initFs() error {
return nil
}
+func ConvertIgnored(workdir string, ignored []string) map[string]string {
+ abs, _ := filepath.Abs(workdir)
+ if len(ignored) == 0 {
+ return nil
+ }
+
+ ign := make(map[string]string, len(ignored))
+ for i := 0; i < len(ignored); i++ {
+ ign[filepath.Join(abs, ignored[i])] = filepath.Join(abs, ignored[i])
+ }
+
+ return ign
+
+}
+
func (w *Watcher) AddWatcherConfig(config WatcherConfig) {
w.watcherConfigs[config.serviceName] = config
}
@@ -260,7 +240,6 @@ func (w *Watcher) retrieveFilesSingle(serviceName, path string) (map[string]os.F
// recursive calls are slow in compare to goto
// so, we will add files with goto pattern
-
outer:
for i := 0; i < len(fileInfoList); i++ {
var pathToFile string
@@ -277,9 +256,9 @@ outer:
continue
}
- err := w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), pathToFile)
- if err != nil {
- // if err is not nil, move to the start of the cycle since the pathToFile not match the hook
+ // if filename does not contain pattern --> ignore that file
+ err = w.watcherConfigs[serviceName].filterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].filePatterns)
+ if err == ErrorSkip {
continue outer
}
@@ -309,48 +288,24 @@ func (w *Watcher) StartPolling(duration time.Duration) error {
return w.waitEvent(duration)
}
-//func (w *Watcher) updatedFileListForConfig(config WatcherConfig) (map[string]os.FileInfo, error) {
-// if config.recursive {
-// return nil, nil
-// }
-//
-// for _, v := range config.directories {
-// files, err := w.retrieveFilesSingle(path.Join(w.workingDir, v))
-// if err != nil {
-// return nil, err
-// }
-//
-// }
-//
-// return nil, nil
-//}
-
// this is blocking operation
func (w *Watcher) waitEvent(d time.Duration) error {
+ ticker := time.NewTicker(d)
for {
- cancel := make(chan struct{})
- ticker := time.NewTicker(d)
- for {
- select {
- case <-w.close:
- close(cancel)
- close(w.Closed)
- return nil
- case <-ticker.C:
- //fileList := make(map[string]os.FileInfo, 100)
- //w.mu.Lock()
-
- for serviceName, config := range w.watcherConfigs {
- go func(sn string, c WatcherConfig) {
- fileList, _ := w.retrieveFileList(sn, c)
- w.pollEvents(c.serviceName, fileList, cancel)
- }(serviceName, config)
- }
- default:
-
+ select {
+ case <-w.close:
+ ticker.Stop()
+ return nil
+ case <-ticker.C:
+ for serviceName, config := range w.watcherConfigs {
+ go func(sn string, c WatcherConfig) {
+ fileList, _ := w.retrieveFileList(sn, c)
+ w.pollEvents(c.serviceName, fileList)
+ }(serviceName, config)
}
}
}
+
}
func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (map[string]os.FileInfo, error) {
@@ -392,43 +347,8 @@ func (w *Watcher) retrieveFileList(serviceName string, config WatcherConfig) (ma
}
return fileList, nil
-
- // Add the file's to the file list.
-
- //return nil
}
-// RemoveRecursive removes either a single file or a directory recursively from
-// the file's list.
-//func (w *Watcher) RemoveRecursive(name string) (err error) {
-// w.mu.Lock()
-// defer w.mu.Unlock()
-//
-// name, err = filepath.Abs(name)
-// if err != nil {
-// return err
-// }
-//
-// // If name is a single file, remove it and return.
-// info, found := w.files[name]
-// if !found {
-// return nil // Doesn't exist, just return.
-// }
-// if !info.IsDir() {
-// delete(w.files, name)
-// return nil
-// }
-//
-// // If it's a directory, delete all of it's contents recursively
-// // from w.files.
-// for path := range w.files {
-// if strings.HasPrefix(path, name) {
-// delete(w.files, path)
-// }
-// }
-// return nil
-//}
-
func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]os.FileInfo, error) {
fileList := make(map[string]os.FileInfo)
@@ -437,14 +357,11 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o
return err
}
- // filename, pattern TODO
- //err = w.watcherConfigs[serviceName].filterHooks(info.Name(), path)
- //if err == ErrorSkip {
- // return nil
- //}
- //if err != nil {
- // return err
- //}
+ // if filename does not contain pattern --> ignore that file
+ err = w.watcherConfigs[serviceName].filterHooks(info.Name(), w.watcherConfigs[serviceName].filePatterns)
+ if err == ErrorSkip {
+ return nil
+ }
// If path is ignored and it's a directory, skip the directory. If it's
// ignored and it's a single file, skip the file.
@@ -462,7 +379,7 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o
})
}
-func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, cancel chan struct{}) {
+func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) {
w.mu.Lock()
defer w.mu.Unlock()
@@ -491,62 +408,66 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo, c
if oldInfo.ModTime() != info.ModTime() {
w.watcherConfigs[serviceName].files[pth] = info
select {
- case <-cancel:
- return
- case w.Event <- Event{Write, pth, pth, info, serviceName}:
+ case w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }:
}
}
if oldInfo.Mode() != info.Mode() {
w.watcherConfigs[serviceName].files[pth] = info
select {
- case <-cancel:
- return
- case w.Event <- Event{Chmod, pth, pth, info, serviceName}:
+ case w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }:
}
}
}
- // Check for renames and moves.
- //for path1, info1 := range removes {
- // for path2, info2 := range creates {
- // if sameFile(info1, info2) {
- // e := Event{
- // Op: Move,
- // Path: path2,
- // OldPath: path1,
- // FileInfo: info1,
- // }
- // // If they are from the same directory, it's a rename
- // // instead of a move event.
- // if filepath.Dir(path1) == filepath.Dir(path2) {
- // e.Op = Rename
- // }
- //
- // delete(removes, path1)
- // delete(creates, path2)
- //
- // select {
- // case <-cancel:
- // return
- // case w.Event <- e:
- // }
- // }
- // }
- //}
- //
- ////Send all the remaining create and remove events.
- //for pth, info := range creates {
- // select {
- // case <-cancel:
- // return
- // case w.Event <- Event{Create, pth, pth, info, serviceName}:
- // }
- //}
- //for pth, info := range removes {
- // select {
- // case <-cancel:
- // return
- // case w.Event <- Event{Remove, pth, pth, info, serviceName}:
- // }
- //}
+ //Check for renames and moves.
+ for path1, info1 := range removes {
+ for path2, info2 := range creates {
+ if sameFile(info1, info2) {
+ e := Event{
+ path: path2,
+ info: info2,
+ service: serviceName,
+ }
+
+ delete(removes, path1)
+ delete(creates, path2)
+
+ select {
+ case w.Event <- e:
+ }
+ }
+ }
+ }
+
+ //Send all the remaining create and remove events.
+ for pth, info := range creates {
+ select {
+ case w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }:
+ }
+ }
+ for pth, info := range removes {
+ select {
+ case w.Event <- Event{
+ path: pth,
+ info: info,
+ service: serviceName,
+ }:
+ }
+ }
+}
+
+func (w *Watcher) Stop() {
+ w.close <- struct{}{}
}