summaryrefslogtreecommitdiff
path: root/tests/plugins/jobs/helpers.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
committerValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
commitf3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch)
tree32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /tests/plugins/jobs/helpers.go
parent5d2cd55ab522d4f1e65a833f91146444465a32ac (diff)
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/jobs/helpers.go')
-rw-r--r--tests/plugins/jobs/helpers.go234
1 files changed, 0 insertions, 234 deletions
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go
deleted file mode 100644
index 6c2d05ca..00000000
--- a/tests/plugins/jobs/helpers.go
+++ /dev/null
@@ -1,234 +0,0 @@
-package jobs
-
-import (
- "bytes"
- "net"
- "net/http"
- "net/rpc"
- "testing"
-
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-)
-
-const (
- push string = "jobs.Push"
- pause string = "jobs.Pause"
- destroy string = "jobs.Destroy"
- resume string = "jobs.Resume"
- stat string = "jobs.Stat"
-)
-
-func resumePipes(pipes ...string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))}
-
- for i := 0; i < len(pipes); i++ {
- pipe.GetPipelines()[i] = pipes[i]
- }
-
- er := &jobsv1beta.Empty{}
- err = client.Call(resume, pipe, er)
- assert.NoError(t, err)
- }
-}
-
-func pushToDisabledPipe(pipeline string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
- Job: "some/php/namespace",
- Id: "1",
- Payload: `{"hello":"world"}`,
- Headers: nil,
- Options: &jobsv1beta.Options{
- Priority: 1,
- Pipeline: pipeline,
- },
- }}
-
- er := &jobsv1beta.Empty{}
- err = client.Call(push, req, er)
- assert.NoError(t, err)
- }
-}
-
-func pushToPipe(pipeline string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
- Job: "some/php/namespace",
- Id: "1",
- Payload: `{"hello":"world"}`,
- Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
- Options: &jobsv1beta.Options{
- Priority: 1,
- Pipeline: pipeline,
- Delay: 0,
- },
- }}
-
- er := &jobsv1beta.Empty{}
- err = client.Call(push, req, er)
- assert.NoError(t, err)
- }
-}
-
-func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
- Job: "some/php/namespace",
- Id: "2",
- Payload: `{"hello":"world"}`,
- Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
- Options: &jobsv1beta.Options{
- Priority: 1,
- Pipeline: pipeline,
- Delay: delay,
- },
- }}
-
- er := &jobsv1beta.Empty{}
- err = client.Call(push, req, er)
- assert.NoError(t, err)
- }
-}
-
-func pushToPipeErr(pipeline string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- require.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
- Job: "some/php/namespace",
- Id: "1",
- Payload: `{"hello":"world"}`,
- Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
- Options: &jobsv1beta.Options{
- Priority: 1,
- Pipeline: pipeline,
- Delay: 0,
- },
- }}
-
- er := &jobsv1beta.Empty{}
- err = client.Call(push, req, er)
- require.Error(t, err)
- }
-}
-func pausePipelines(pipes ...string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))}
-
- for i := 0; i < len(pipes); i++ {
- pipe.GetPipelines()[i] = pipes[i]
- }
-
- er := &jobsv1beta.Empty{}
- err = client.Call(pause, pipe, er)
- assert.NoError(t, err)
- }
-}
-
-func destroyPipelines(pipes ...string) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))}
-
- for i := 0; i < len(pipes); i++ {
- pipe.GetPipelines()[i] = pipes[i]
- }
-
- er := &jobsv1beta.Empty{}
- err = client.Call(destroy, pipe, er)
- assert.NoError(t, err)
- }
-}
-
-func enableProxy(name string, t *testing.T) {
- buf := new(bytes.Buffer)
- buf.WriteString(`{"enabled":true}`)
-
- resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx
- require.NoError(t, err)
- require.Equal(t, 200, resp.StatusCode)
- if resp.Body != nil {
- _ = resp.Body.Close()
- }
-}
-
-func disableProxy(name string, t *testing.T) {
- buf := new(bytes.Buffer)
- buf.WriteString(`{"enabled":false}`)
-
- resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx
- require.NoError(t, err)
- require.Equal(t, 200, resp.StatusCode)
- if resp.Body != nil {
- _ = resp.Body.Close()
- }
-}
-
-func deleteProxy(name string, t *testing.T) {
- client := &http.Client{}
-
- req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx
- require.NoError(t, err)
-
- resp, err := client.Do(req)
- require.NoError(t, err)
-
- require.NoError(t, err)
- require.Equal(t, 204, resp.StatusCode)
- if resp.Body != nil {
- _ = resp.Body.Close()
- }
-}
-
-func stats(state *jobState.State) func(t *testing.T) {
- return func(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- st := &jobsv1beta.Stats{}
- er := &jobsv1beta.Empty{}
-
- err = client.Call(stat, er, st)
- require.NoError(t, err)
- require.NotNil(t, st)
-
- state.Queue = st.Stats[0].Queue
- state.Pipeline = st.Stats[0].Pipeline
- state.Driver = st.Stats[0].Driver
- state.Active = st.Stats[0].Active
- state.Delayed = st.Stats[0].Delayed
- state.Reserved = st.Stats[0].Reserved
- state.Ready = st.Stats[0].Ready
- }
-}