diff options
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 8 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml | 27 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml | 57 | ||||
-rw-r--r-- | tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml | 21 | ||||
-rw-r--r-- | tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml | 37 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 115 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 15 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 235 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 246 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 92 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 236 | ||||
-rw-r--r-- | tests/plugins/jobs/sqs/.rr-sqs-declare.yaml | 29 | ||||
-rw-r--r-- | tests/plugins/jobs/sqs/.rr-sqs-init.yaml | 53 |
16 files changed, 1072 insertions, 107 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1490e587..ab5aad14 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -258,7 +258,7 @@ func (j *JobConsumer) Pause(p string) { j.stopCh <- struct{}{} j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index c49a23c1..2b0ff40b 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -68,10 +68,8 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand stopCh: make(chan struct{}, 1), } - jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000)) - // initialize a local queue - jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000)) // consume from the queue go jb.consume() diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 43617716..246ff3ba 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -340,7 +340,7 @@ func (j *JobConsumer) Pause(p string) { j.pauseCh <- struct{}{} j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 8c5d887e..887f8358 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -78,11 +78,19 @@ func (j *JobConsumer) listen() { //nolint:gocognit continue } + // No retry + if item.Options.MaxAttempts == 0 { + j.pq.Insert(item) + continue + } + + // MaxAttempts option specified if item.Options.CanRetry() { j.pq.Insert(item) continue } + // If MaxAttempts is more than 0, and can't retry -> delete the message _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml new file mode 100644 index 00000000..85f92bac --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://localhost:11300 + # connect timeout + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml new file mode 100644 index 00000000..5ca45b7d --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml @@ -0,0 +1,57 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://localhost:11300 + # connect timeout + timeout: 10s + +logs: + level: info + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: amqp + prefetch: 100 + queue: test-1-queue + priority: 1 + exchange: default + exchange_type: direct + routing_key: test-1 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + test-2: + driver: amqp + prefetch: 100 + queue: test-2-queue + priority: 2 + exchange: default + exchange_type: direct + routing_key: test-2 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml new file mode 100644 index 00000000..ea5b61d2 --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml new file mode 100644 index 00000000..ee628387 --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: info + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: ephemeral + priority: 10 + prefetch: 10000 + + test-2: + driver: ephemeral + priority: 10 + prefetch: 10000 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go new file mode 100644 index 00000000..5ef5f022 --- /dev/null +++ b/tests/plugins/jobs/helpers.go @@ -0,0 +1,115 @@ +package jobs + +import ( + "net" + "net/rpc" + "testing" + + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/stretchr/testify/assert" +) + +func resumePipes(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) + } +} + +func pushToDisabledPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: nil, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Push", req, er) + assert.Error(t, err) + } +} + +func pushToPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + Attempts: 0, + RetryDelay: 0, + Timeout: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Push", req, er) + assert.NoError(t, err) + } +} + +func pausePipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Pause", pipe, er) + assert.NoError(t, err) + } +} + +func destroyPipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Destroy", pipe, er) + assert.NoError(t, err) + } +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 52bb6c7c..a95ac98b 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -205,7 +205,7 @@ func TestAMQPDeclare(t *testing.T) { time.Sleep(time.Second * 3) t.Run("DeclareAMQPPipeline", declareAMQPPipe) - t.Run("ConsumeAMQPPipeline", consumeAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) t.Run("PushAMQPPipeline", pushToPipe("test-3")) t.Run("PauseAMQPPipeline", pausePipelines("test-3")) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) @@ -238,16 +238,3 @@ func declareAMQPPipe(t *testing.T) { err = client.Call("jobs.Declare", pipe, er) assert.NoError(t, err) } - -func consumeAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-3" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go new file mode 100644 index 00000000..44d4f85e --- /dev/null +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -0,0 +1,235 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestBeanstalkInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestBeanstalkDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) + t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) + t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareBeanstalkPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "beanstalk", + "name": "test-3", + "tube": "default", + "reserve_timeout": "1", + "priority": "3", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go new file mode 100644 index 00000000..e8974a59 --- /dev/null +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -0,0 +1,246 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestEphemeralInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestEphemeralDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareEphemeralPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "ephemeral", + "name": "test-3", + "pipeline_size": "10000", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeEphemeralPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index 0f9c2bb1..54015f03 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -69,7 +69,8 @@ func TestJobsInit(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + &logger.ZapLogger{}, + // mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -194,7 +195,8 @@ func TestJobsPauseResume(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + &logger.ZapLogger{}, + // mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -286,89 +288,3 @@ func ephemeralResume(t *testing.T) { err = client.Call("jobs.Resume", pipe, er) assert.NoError(t, err) } - -func pushToDisabledPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: nil, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Push", req, er) - assert.Error(t, err) - } -} - -func pushToPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: 0, - Attempts: 0, - RetryDelay: 0, - Timeout: 0, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Push", req, er) - assert.NoError(t, err) - } -} - -func pausePipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Pause", pipe, er) - assert.NoError(t, err) - } -} - -func destroyPipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Destroy", pipe, er) - assert.NoError(t, err) - } -} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go new file mode 100644 index 00000000..359842dc --- /dev/null +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -0,0 +1,236 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestSQSInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestSQSDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + t.Run("DestroySQSPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareSQSPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "sqs", + "name": "test-3", + "queue": "default", + "prefetch": "100", + "priority": "3", + "wait_time": "3", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml b/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml new file mode 100644 index 00000000..d75265ee --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml @@ -0,0 +1,29 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +# amazon sqs configuration +# General section +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml new file mode 100644 index 00000000..f2702906 --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml @@ -0,0 +1,53 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +# amazon sqs configuration +# General section +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: sqs + prefetch: 1000 + queue: default + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + + test-2: + driver: sqs + prefetch: 1000 + queue: default-2 + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + |