summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/metrics/plugin.go48
-rw-r--r--plugins/metrics/tests/.rr-test.yaml12
-rw-r--r--plugins/metrics/tests/metrics_test.go45
-rw-r--r--plugins/metrics/tests/plugin1.go49
-rw-r--r--plugins/metrics/tests/plugin2.go54
-rwxr-xr-xplugins/rpc/plugin.go11
-rw-r--r--plugins/rpc/tests/plugin1.go4
7 files changed, 184 insertions, 39 deletions
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index 3795386b..fad8ca80 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -101,16 +101,14 @@ func (m *Plugin) Serve() chan error {
m.collectors.Range(func(key, value interface{}) bool {
// key - name
// value - collector
- c := value.(prometheus.Collector)
- if err := m.registry.Register(c); err != nil {
+ c := value.(statsProvider)
+ if err := m.registry.Register(c.collector); err != nil {
errCh <- err
return false
}
return true
})
- m.mu.Lock()
-
var topCipherSuites []uint16
var defaultCipherSuitesTLS13 []uint16
@@ -179,32 +177,34 @@ func (m *Plugin) Serve() chan error {
PreferServerCipherSuites: true,
},
}
- m.mu.Unlock()
- err := m.http.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- errCh <- err
- return errCh
- }
+ go func() {
+ err := m.http.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ errCh <- err
+ return
+ }
+ }()
return errCh
}
// Stop prometheus metrics service.
-func (m *Plugin) Stop() {
+func (m *Plugin) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.http != nil {
- // gracefully stop server
- go func() {
- err := m.http.Shutdown(context.Background())
- if err != nil {
- // Function should be Stop() error
- m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
- }
- }()
+ // timeout is 10 seconds
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ err := m.http.Shutdown(ctx)
+ if err != nil {
+ // Function should be Stop() error
+ m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ }
}
+ return nil
}
func (m *Plugin) Collects() []interface{} {
@@ -215,9 +215,17 @@ func (m *Plugin) Collects() []interface{} {
// Collector returns application specific collector by name or nil if collector not found.
func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error {
- m.collectors.Store(name, statsProvider{
+ m.collectors.Store(name.Name(), statsProvider{
collector: stat.MetricsCollector(),
name: name.Name(),
})
return nil
}
+
+func (m *Plugin) Name() string {
+ return ServiceName
+}
+
+func (m *Plugin) RPC() interface{} {
+ return &rpcServer{svc: m}
+}
diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml
index cc4771d4..79343e3c 100644
--- a/plugins/metrics/tests/.rr-test.yaml
+++ b/plugins/metrics/tests/.rr-test.yaml
@@ -1,9 +1,13 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
metrics:
# prometheus client address (path /metrics added automatically)
address: localhost:2112
collect:
app_metric:
- type: histogram
- help: "Custom application metric"
- labels: ["type"]
- buckets: [0.1, 0.2, 0.3, 1.0] \ No newline at end of file
+ type: histogram
+ help: "Custom application metric"
+ labels: [ "type" ]
+ buckets: [ 0.1, 0.2, 0.3, 1.0 ] \ No newline at end of file
diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go
index d04c75d3..f21016d4 100644
--- a/plugins/metrics/tests/metrics_test.go
+++ b/plugins/metrics/tests/metrics_test.go
@@ -1,13 +1,18 @@
package tests
import (
- "fmt"
+ "os"
+ "os/signal"
+ "syscall"
"testing"
+ "time"
"github.com/spiral/endure"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/metrics"
+ "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/stretchr/testify/assert"
)
func TestMetricsInit(t *testing.T) {
@@ -30,22 +35,54 @@ func TestMetricsInit(t *testing.T) {
t.Fatal(err)
}
+ err = cont.Register(&rpc.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
err = cont.Register(&logger.ZapLogger{})
if err != nil {
t.Fatal(err)
}
+ err = cont.Register(&Plugin1{})
+ if err != nil {
+ t.Fatal(err)
+ }
err = cont.Init()
if err != nil {
t.Fatal(err)
}
- errCh, err := cont.Serve()
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Second * 5)
for {
select {
- case e := <-errCh:
- fmt.Println(e)
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
}
}
}
diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go
new file mode 100644
index 00000000..fdf10e54
--- /dev/null
+++ b/plugins/metrics/tests/plugin1.go
@@ -0,0 +1,49 @@
+package tests
+
+import (
+ "fmt"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Plugin1 struct {
+ config config.Configurer
+}
+
+func (p1 *Plugin1) Init(cfg config.Configurer) error {
+ p1.config = cfg
+ return nil
+}
+
+func (p1 *Plugin1) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+func (p1 *Plugin1) Stop() error {
+ return nil
+}
+
+func (p1 *Plugin1) Name() string {
+ return "metrics_test.plugin1"
+}
+
+func (p1 *Plugin1) MetricsCollector() prometheus.Collector {
+ var (
+ cpuTemp = prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "cpu_temperature_celsius",
+ Help: "Current temperature of the CPU.",
+ })
+ )
+ return cpuTemp
+}
+
+type PluginRpc struct {
+ srv *Plugin1
+}
+
+func (r *PluginRpc) Hello(in string, out *string) error {
+ *out = fmt.Sprintf("Hello, username: %s", in)
+ return nil
+}
diff --git a/plugins/metrics/tests/plugin2.go b/plugins/metrics/tests/plugin2.go
new file mode 100644
index 00000000..4156db6d
--- /dev/null
+++ b/plugins/metrics/tests/plugin2.go
@@ -0,0 +1,54 @@
+package tests
+
+import (
+ "net"
+ "net/rpc"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v2"
+)
+
+// plugin2 makes a call to the plugin1 via RPC
+// this is just a simulation of external call FOR TEST
+// you don't need to do such things :)
+type Plugin2 struct {
+}
+
+func (p2 *Plugin2) Init() error {
+ return nil
+}
+
+func (p2 *Plugin2) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ go func() {
+ time.Sleep(time.Second * 3)
+
+ conn, err := net.Dial("tcp", "127.0.0.1:7001")
+ if err != nil {
+ errCh <- errors.E(errors.Serve, err)
+ return
+ }
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret string
+ err = client.Call("metrics_test.plugin1.Hello", "Valery", &ret)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ if ret != "Hello, username: Valery" {
+ errCh <- errors.E("wrong response")
+ return
+ }
+ // to stop exec
+ errCh <- errors.E(errors.Disabled)
+ return
+ }()
+
+ return errCh
+}
+
+func (p2 *Plugin2) Stop() error {
+ return nil
+}
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index d7f91742..82b30563 100755
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -67,13 +67,7 @@ func (s *Plugin) Serve() chan error {
// Attach all services
for i := 0; i < len(s.services); i++ {
- svc, err := s.services[i].service.RPC()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- err = s.Register(s.services[i].name, svc)
+ err := s.Register(s.services[i].name, s.services[i].service.RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -137,12 +131,11 @@ func (s *Plugin) Collects() []interface{} {
}
// RegisterPlugin registers RPC service plugin.
-func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) error {
+func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) {
s.services = append(s.services, pluggable{
service: p,
name: name.Name(),
})
- return nil
}
// Register publishes in the server the set of methods of the
diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go
index 98373a12..a8d5c216 100644
--- a/plugins/rpc/tests/plugin1.go
+++ b/plugins/rpc/tests/plugin1.go
@@ -28,8 +28,8 @@ func (p1 *Plugin1) Name() string {
return "rpc_test.plugin1"
}
-func (p1 *Plugin1) RPC() (interface{}, error) {
- return &PluginRpc{srv: p1}, nil
+func (p1 *Plugin1) RPC() interface{} {
+ return &PluginRpc{srv: p1}
}
type PluginRpc struct {