summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
committerValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
commite9713a1d08a93e2be70c889c600ed89f54822b54 (patch)
tree4198126207cc497453bf666c7411a67b8a4c39a2
parent83b246c68ea1594de2462c4ada3498babae906fb (diff)
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/events/jobs_events.go4
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go7
-rw-r--r--plugins/jobs/plugin.go57
-rw-r--r--plugins/jobs/rpc.go38
-rw-r--r--proto/jobs/v1beta/jobs.pb.go114
-rw-r--r--proto/jobs/v1beta/jobs.proto4
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-declare.yaml24
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-init.yaml54
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go253
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go50
10 files changed, 497 insertions, 108 deletions
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
index 9a7116ff..9a871956 100644
--- a/pkg/events/jobs_events.go
+++ b/pkg/events/jobs_events.go
@@ -23,6 +23,7 @@ const (
// EventPipeRun when pipeline pipelines has been requested.
EventPipeRun
+ // EventInitialized when pipeline has been initialized, but not started
EventInitialized
// EventPipeActive when pipeline has started.
@@ -31,6 +32,9 @@ const (
// EventPipeStopped when pipeline has been stopped.
EventPipeStopped
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
// EventPipeError when pipeline specific error happen.
EventPipeError
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d592a17a..f6442b42 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) {
}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) {
l := atomic.LoadUint32(&j.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ j.log.Warn("amqp listener already in the active state")
return
}
@@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) {
// run listener
j.listener(deliv)
+ // increase number of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d761de79..e118f732 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
return nil
}
-func (p *Plugin) Pause(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+func (p *Plugin) Pause(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
- ppl := pipe.(*pipeline.Pipeline)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Pause(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
-}
-func (p *Plugin) Resume(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+ // redirect call to the underlying driver
+ d.Pause(ppl.Name())
+}
- ppl := pipe.(*pipeline.Pipeline)
+func (p *Plugin) Resume(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Resume(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
+
+ // redirect call to the underlying driver
+ d.Resume(ppl.Name())
}
// Declare a pipeline.
@@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} {
func (p *Plugin) collectJobsEvents(event interface{}) {
if jev, ok := event.(events.JobEvent); ok {
switch jev.Event {
+ case events.EventPipePaused:
+ p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 10158e74..0d15fb0f 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -66,29 +66,23 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
return nil
}
-func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
- pipelines := make([]string, len(req.GetPipelines()))
-
- for i := 0; i < len(pipelines); i++ {
- pipelines[i] = req.GetPipelines()[i]
+func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Pause(req.GetPipelines()[i])
}
- r.p.Pause(pipelines)
return nil
}
-func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
- pipelines := make([]string, len(req.GetPipelines()))
-
- for i := 0; i < len(pipelines); i++ {
- pipelines[i] = req.GetPipelines()[i]
+func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Resume(req.GetPipelines()[i])
}
- r.p.Resume(pipelines)
return nil
}
-func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error {
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
resp.Pipelines = r.p.List()
return nil
}
@@ -114,6 +108,24 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
return nil
}
+func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
+ const op = errors.Op("rcp_declare_pipeline")
+
+ var destroyed []string
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ err := r.p.Destroy(req.GetPipelines()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+ destroyed = append(destroyed, req.GetPipelines()[i])
+ }
+
+ // return destroyed pipelines
+ resp.Pipelines = destroyed
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index 529cd972..9201f659 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -116,8 +116,8 @@ func (x *PushBatchRequest) GetJobs() []*Job {
return nil
}
-// request to pause/resume/list
-type Maintenance struct {
+// request to pause/resume/list/Destroy
+type Pipelines struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -125,8 +125,8 @@ type Maintenance struct {
Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"`
}
-func (x *Maintenance) Reset() {
- *x = Maintenance{}
+func (x *Pipelines) Reset() {
+ *x = Pipelines{}
if protoimpl.UnsafeEnabled {
mi := &file_jobs_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -134,13 +134,13 @@ func (x *Maintenance) Reset() {
}
}
-func (x *Maintenance) String() string {
+func (x *Pipelines) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*Maintenance) ProtoMessage() {}
+func (*Pipelines) ProtoMessage() {}
-func (x *Maintenance) ProtoReflect() protoreflect.Message {
+func (x *Pipelines) ProtoReflect() protoreflect.Message {
mi := &file_jobs_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -152,12 +152,12 @@ func (x *Maintenance) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use Maintenance.ProtoReflect.Descriptor instead.
-func (*Maintenance) Descriptor() ([]byte, []int) {
+// Deprecated: Use Pipelines.ProtoReflect.Descriptor instead.
+func (*Pipelines) Descriptor() ([]byte, []int) {
return file_jobs_proto_rawDescGZIP(), []int{2}
}
-func (x *Maintenance) GetPipelines() []string {
+func (x *Pipelines) GetPipelines() []string {
if x != nil {
return x.Pipelines
}
@@ -475,51 +475,51 @@ var file_jobs_proto_rawDesc = []byte{
0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10,
0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62,
- 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x2b, 0x0a, 0x0b, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65,
- 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e,
- 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69,
- 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a,
- 0x0e, 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
- 0x45, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28,
- 0x0b, 0x32, 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
- 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50,
- 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69,
- 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69,
- 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
- 0x02, 0x38, 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a,
- 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a,
- 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a,
- 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
- 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65,
- 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e,
- 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65,
- 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
- 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28,
- 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
- 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
- 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
- 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
- 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
- 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f,
- 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a,
- 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65,
- 0x6c, 0x61, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79,
- 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01,
- 0x28, 0x03, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b,
- 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28,
- 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a,
- 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07,
- 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65,
- 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
- 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d,
- 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x29, 0x0a, 0x09, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69,
+ 0x6e, 0x65, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73,
+ 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65,
+ 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a, 0x0e, 0x44,
+ 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x45, 0x0a,
+ 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32,
+ 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x44, 0x65,
+ 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50, 0x69, 0x70,
+ 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65,
+ 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65,
+ 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
+ 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69,
+ 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70,
+ 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61,
+ 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
+ 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31,
+ 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
+ 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e,
+ 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70,
+ 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54,
+ 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10,
+ 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79,
+ 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65,
+ 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
+ 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08,
+ 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
+ 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61,
+ 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a,
+ 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65,
+ 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52,
+ 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74,
+ 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69,
+ 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56,
+ 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20,
+ 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f,
+ 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x33,
}
var (
@@ -538,7 +538,7 @@ var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_jobs_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
(*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
- (*Maintenance)(nil), // 2: jobs.v1beta.Maintenance
+ (*Pipelines)(nil), // 2: jobs.v1beta.Pipelines
(*Empty)(nil), // 3: jobs.v1beta.Empty
(*DeclareRequest)(nil), // 4: jobs.v1beta.DeclareRequest
(*Job)(nil), // 5: jobs.v1beta.Job
@@ -592,7 +592,7 @@ func file_jobs_proto_init() {
}
}
file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Maintenance); i {
+ switch v := v.(*Pipelines); i {
case 0:
return &v.state
case 1:
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 7a2bcd13..dc86e1c2 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -13,8 +13,8 @@ message PushBatchRequest {
repeated Job jobs = 1;
}
-// request to pause/resume/list
-message Maintenance {
+// request to pause/resume/list/Destroy
+message Pipelines {
repeated string pipelines = 1;
}
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
new file mode 100644
index 00000000..ed0345d6
--- /dev/null
+++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
@@ -0,0 +1,24 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+logs:
+ level: debug
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
new file mode 100644
index 00000000..44e12d89
--- /dev/null
+++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
@@ -0,0 +1,54 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ pipelines:
+ test-1:
+ driver: amqp
+ prefetch: 100
+ queue: test-1-queue
+ priority: 1
+ exchange: default
+ exchange_type: direct
+ routing_key: test-1
+ exclusive: false
+ multiple_ack: false
+ requeue_on_fail: false
+
+ test-2:
+ driver: amqp
+ prefetch: 100
+ queue: test-2-queue
+ priority: 2
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+ exclusive: false
+ multiple_ack: false
+ requeue_on_fail: false
+
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-1", "test-2" ]
+
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
new file mode 100644
index 00000000..7e74891c
--- /dev/null
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -0,0 +1,253 @@
+package jobs
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/golang/mock/gomock"
+ endure "github.com/spiral/endure/pkg/container"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/resetter"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAMQPInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "amqp/.rr-amqp-init.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestAMQPDeclare(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "amqp/.rr-amqp-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ //mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareAMQPPipeline", declareAMQPPipe)
+ t.Run("ConsumeAMQPPipeline", consumeAMQPPipe)
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
+ t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func declareAMQPPipe(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
+ "driver": "amqp",
+ "name": "test-3",
+ "routing-key": "test-3",
+ "queue": "default",
+ "exchange-type": "direct",
+ "exchange": "amqp.default",
+ "prefetch": "100",
+ "priority": "3",
+ "exclusive": "true",
+ "multiple_ask": "true",
+ "requeue_on_fail": "true",
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Declare", pipe, er)
+ assert.NoError(t, err)
+}
+
+func consumeAMQPPipe(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)}
+ pipe.GetPipelines()[0] = "test-3"
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Resume", pipe, er)
+ assert.NoError(t, err)
+}
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index c10ac350..0f9c2bb1 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -266,7 +266,7 @@ func ephemeralPause(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)}
+ pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)}
pipe.GetPipelines()[0] = "test-local"
er := &jobsv1beta.Empty{}
@@ -279,7 +279,7 @@ func ephemeralResume(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)}
+ pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)}
pipe.GetPipelines()[0] = "test-local"
er := &jobsv1beta.Empty{}
@@ -320,10 +320,14 @@ func pushToPipe(pipeline string) func(t *testing.T) {
Job: "some/php/namespace",
Id: "1",
Payload: `{"hello":"world"}`,
- Headers: nil,
+ Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
Options: &jobsv1beta.Options{
- Priority: 1,
- Pipeline: pipeline,
+ Priority: 1,
+ Pipeline: pipeline,
+ Delay: 0,
+ Attempts: 0,
+ RetryDelay: 0,
+ Timeout: 0,
},
}}
@@ -332,3 +336,39 @@ func pushToPipe(pipeline string) func(t *testing.T) {
assert.NoError(t, err)
}
}
+
+func pausePipelines(pipes ...string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))}
+
+ for i := 0; i < len(pipes); i++ {
+ pipe.GetPipelines()[i] = pipes[i]
+ }
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Pause", pipe, er)
+ assert.NoError(t, err)
+ }
+}
+
+func destroyPipelines(pipes ...string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))}
+
+ for i := 0; i < len(pipes); i++ {
+ pipe.GetPipelines()[i] = pipes[i]
+ }
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Destroy", pipe, er)
+ assert.NoError(t, err)
+ }
+}