diff options
Diffstat (limited to 'tests/plugins/jobs/helpers.go')
-rw-r--r-- | tests/plugins/jobs/helpers.go | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index a268ebb8..5067ef9f 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -8,6 +8,7 @@ import ( "testing" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +19,7 @@ const ( pause string = "jobs.Pause" destroy string = "jobs.Destroy" resume string = "jobs.Resume" + stat string = "jobs.Stat" ) func resumePipes(pipes ...string) func(t *testing.T) { @@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) { er := &jobsv1beta.Empty{} err = client.Call(push, req, er) - assert.Error(t, err) + assert.NoError(t, err) } } @@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) { } } +func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: delay, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + func pushToPipeErr(pipeline string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") @@ -183,3 +209,26 @@ func deleteProxy(name string, t *testing.T) { _ = resp.Body.Close() } } + +func stats(state *jobState.State) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + st := &jobsv1beta.Stats{} + er := &jobsv1beta.Empty{} + + err = client.Call(stat, er, st) + require.NoError(t, err) + require.NotNil(t, st) + + state.Queue = st.Stats[0].Queue + state.Pipeline = st.Stats[0].Pipeline + state.Driver = st.Stats[0].Driver + state.Active = st.Stats[0].Active + state.Delayed = st.Stats[0].Delayed + state.Reserved = st.Stats[0].Reserved + state.Ready = st.Stats[0].Ready + } +} |