diff options
Diffstat (limited to 'tests/plugins/jobs/helpers.go')
-rw-r--r-- | tests/plugins/jobs/helpers.go | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go new file mode 100644 index 00000000..4c2f2fea --- /dev/null +++ b/tests/plugins/jobs/helpers.go @@ -0,0 +1,185 @@ +package jobs + +import ( + "bytes" + "net" + "net/http" + "net/rpc" + "testing" + + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + push string = "jobs.Push" + pause string = "jobs.Pause" + destroy string = "jobs.Destroy" + resume string = "jobs.Resume" +) + +func resumePipes(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(resume, pipe, er) + assert.NoError(t, err) + } +} + +func pushToDisabledPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: nil, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.Error(t, err) + } +} + +func pushToPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + +func pushToPipeErr(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + require.Error(t, err) + } +} +func pausePipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(pause, pipe, er) + assert.NoError(t, err) + } +} + +func destroyPipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(destroy, pipe, er) + assert.NoError(t, err) + } +} + +func enableProxy(name string, t *testing.T) { + buf := new(bytes.Buffer) + buf.WriteString(`{"enabled":true}`) + + resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} + +func disableProxy(name string, t *testing.T) { + buf := new(bytes.Buffer) + buf.WriteString(`{"enabled":false}`) + + resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} + +func deleteProxy(name string, t *testing.T) { + client := &http.Client{} + + req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx + require.NoError(t, err) + + resp, err := client.Do(req) + require.NoError(t, err) + + require.NoError(t, err) + require.Equal(t, 204, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} |