diff options
Diffstat (limited to 'tests/plugins/jobs/helpers.go')
-rw-r--r-- | tests/plugins/jobs/helpers.go | 234 |
1 files changed, 0 insertions, 234 deletions
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go deleted file mode 100644 index 6c2d05ca..00000000 --- a/tests/plugins/jobs/helpers.go +++ /dev/null @@ -1,234 +0,0 @@ -package jobs - -import ( - "bytes" - "net" - "net/http" - "net/rpc" - "testing" - - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - push string = "jobs.Push" - pause string = "jobs.Pause" - destroy string = "jobs.Destroy" - resume string = "jobs.Resume" - stat string = "jobs.Stat" -) - -func resumePipes(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(resume, pipe, er) - assert.NoError(t, err) - } -} - -func pushToDisabledPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: nil, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: 0, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "2", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: delay, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipeErr(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - require.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: 0, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - require.Error(t, err) - } -} -func pausePipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(pause, pipe, er) - assert.NoError(t, err) - } -} - -func destroyPipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(destroy, pipe, er) - assert.NoError(t, err) - } -} - -func enableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":true}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func disableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":false}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func deleteProxy(name string, t *testing.T) { - client := &http.Client{} - - req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx - require.NoError(t, err) - - resp, err := client.Do(req) - require.NoError(t, err) - - require.NoError(t, err) - require.Equal(t, 204, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func stats(state *jobState.State) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - st := &jobsv1beta.Stats{} - er := &jobsv1beta.Empty{} - - err = client.Call(stat, er, st) - require.NoError(t, err) - require.NotNil(t, st) - - state.Queue = st.Stats[0].Queue - state.Pipeline = st.Stats[0].Pipeline - state.Driver = st.Stats[0].Driver - state.Active = st.Stats[0].Active - state.Delayed = st.Stats[0].Delayed - state.Reserved = st.Stats[0].Reserved - state.Ready = st.Stats[0].Ready - } -} |