summaryrefslogtreecommitdiff
path: root/tests/plugins/jobs/helpers.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/jobs/helpers.go')
-rw-r--r--tests/plugins/jobs/helpers.go51
1 files changed, 50 insertions, 1 deletions
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go
index a268ebb8..5067ef9f 100644
--- a/tests/plugins/jobs/helpers.go
+++ b/tests/plugins/jobs/helpers.go
@@ -8,6 +8,7 @@ import (
"testing"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -18,6 +19,7 @@ const (
pause string = "jobs.Pause"
destroy string = "jobs.Destroy"
resume string = "jobs.Resume"
+ stat string = "jobs.Stat"
)
func resumePipes(pipes ...string) func(t *testing.T) {
@@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) {
er := &jobsv1beta.Empty{}
err = client.Call(push, req, er)
- assert.Error(t, err)
+ assert.NoError(t, err)
}
}
@@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) {
}
}
+func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
+ Job: "some/php/namespace",
+ Id: "1",
+ Payload: `{"hello":"world"}`,
+ Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
+ Options: &jobsv1beta.Options{
+ Priority: 1,
+ Pipeline: pipeline,
+ Delay: delay,
+ },
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call(push, req, er)
+ assert.NoError(t, err)
+ }
+}
+
func pushToPipeErr(pipeline string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
@@ -183,3 +209,26 @@ func deleteProxy(name string, t *testing.T) {
_ = resp.Body.Close()
}
}
+
+func stats(state *jobState.State) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ st := &jobsv1beta.Stats{}
+ er := &jobsv1beta.Empty{}
+
+ err = client.Call(stat, er, st)
+ require.NoError(t, err)
+ require.NotNil(t, st)
+
+ state.Queue = st.Stats[0].Queue
+ state.Pipeline = st.Stats[0].Pipeline
+ state.Driver = st.Stats[0].Driver
+ state.Active = st.Stats[0].Active
+ state.Delayed = st.Stats[0].Delayed
+ state.Reserved = st.Stats[0].Reserved
+ state.Ready = st.Stats[0].Ready
+ }
+}