summaryrefslogtreecommitdiff
path: root/plugins/informer
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-25 00:55:15 +0300
committerValery Piashchynski <[email protected]>2020-12-25 00:55:15 +0300
commitf52156e76cf02e0b3de993fa6d9ba0d2f4e52001 (patch)
tree025c6c60930844bd10ac776e90266d679e3c1606 /plugins/informer
parent1bc3db2ea9b95edd0101676d7bfd75df3782c3bd (diff)
Initial commit of experiment
Diffstat (limited to 'plugins/informer')
-rw-r--r--plugins/informer/interface.go8
-rw-r--r--plugins/informer/plugin.go55
-rw-r--r--plugins/informer/rpc.go54
3 files changed, 117 insertions, 0 deletions
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
new file mode 100644
index 00000000..27139ae1
--- /dev/null
+++ b/plugins/informer/interface.go
@@ -0,0 +1,8 @@
+package informer
+
+import "github.com/spiral/roadrunner/v2/interfaces/worker"
+
+// Informer used to get workers from particular plugin or set of plugins
+type Informer interface {
+ Workers() []worker.BaseProcess
+}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
new file mode 100644
index 00000000..e2da7d86
--- /dev/null
+++ b/plugins/informer/plugin.go
@@ -0,0 +1,55 @@
+package informer
+
+import (
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner-plugins/logger"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+)
+
+const PluginName = "informer"
+
+type Plugin struct {
+ registry map[string]Informer
+ log logger.Logger
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.registry = make(map[string]Informer)
+ p.log = log
+ return nil
+}
+
+// Workers provides BaseProcess slice with workers for the requested plugin
+func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
+ const op = errors.Op("get workers")
+ svc, ok := p.registry[name]
+ if !ok {
+ return nil, errors.E(op, errors.Errorf("no such service: %s", name))
+ }
+
+ return svc.Workers(), nil
+}
+
+// CollectTarget resettable service.
+func (p *Plugin) CollectTarget(name endure.Named, r Informer) error {
+ p.registry[name.Name()] = r
+ return nil
+}
+
+// Collects declares services to be collected.
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectTarget,
+ }
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// RPCService returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, log: p.log}
+}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
new file mode 100644
index 00000000..d32d4e3a
--- /dev/null
+++ b/plugins/informer/rpc.go
@@ -0,0 +1,54 @@
+package informer
+
+import (
+ "github.com/spiral/roadrunner-plugins/logger"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/tools"
+)
+
+type rpc struct {
+ srv *Plugin
+ log logger.Logger
+}
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []tools.ProcessState `json:"workers"`
+}
+
+// List all resettable services.
+func (rpc *rpc) List(_ bool, list *[]string) error {
+ rpc.log.Debug("Started List method")
+ *list = make([]string, 0, len(rpc.srv.registry))
+
+ for name := range rpc.srv.registry {
+ *list = append(*list, name)
+ }
+ rpc.log.Debug("list of services", "list", *list)
+
+ rpc.log.Debug("successfully finished List method")
+ return nil
+}
+
+// Workers state of a given service.
+func (rpc *rpc) Workers(service string, list *WorkerList) error {
+ rpc.log.Debug("started Workers method", "service", service)
+ workers, err := rpc.srv.Workers(service)
+ if err != nil {
+ return err
+ }
+
+ list.Workers = make([]tools.ProcessState, 0)
+ for _, w := range workers {
+ ps, err := tools.WorkerProcessState(w.(worker.BaseProcess))
+ if err != nil {
+ continue
+ }
+
+ list.Workers = append(list.Workers, ps)
+ }
+ rpc.log.Debug("list of workers", "workers", list.Workers)
+ rpc.log.Debug("successfully finished Workers method")
+ return nil
+}