diff options
-rw-r--r-- | pkg/events/jobs_events.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 57 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 38 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.pb.go | 114 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 4 | ||||
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-amqp-declare.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-amqp-init.yaml | 54 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 253 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 50 |
10 files changed, 497 insertions, 108 deletions
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go index 9a7116ff..9a871956 100644 --- a/pkg/events/jobs_events.go +++ b/pkg/events/jobs_events.go @@ -23,6 +23,7 @@ const ( // EventPipeRun when pipeline pipelines has been requested. EventPipeRun + // EventInitialized when pipeline has been initialized, but not started EventInitialized // EventPipeActive when pipeline has started. @@ -31,6 +32,9 @@ const ( // EventPipeStopped when pipeline has been stopped. EventPipeStopped + // EventPipePaused when pipeline has been paused. + EventPipePaused + // EventPipeError when pipeline specific error happen. EventPipeError diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d592a17a..f6442b42 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) { } j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), @@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) { l := atomic.LoadUint32(&j.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + j.log.Warn("amqp listener already in the active state") return } @@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + // increase number of listeners + atomic.AddUint32(&j.listeners, 1) + j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d761de79..e118f732 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error { return nil } -func (p *Plugin) Pause(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } +func (p *Plugin) Pause(pp string) { + pipe, ok := p.pipelines.Load(pp) - ppl := pipe.(*pipeline.Pipeline) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Pause(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } -} -func (p *Plugin) Resume(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } + // redirect call to the underlying driver + d.Pause(ppl.Name()) +} - ppl := pipe.(*pipeline.Pipeline) +func (p *Plugin) Resume(pp string) { + pipe, ok := p.pipelines.Load(pp) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Resume(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) } // Declare a pipeline. @@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} { func (p *Plugin) collectJobsEvents(event interface{}) { if jev, ok := event.(events.JobEvent); ok { switch jev.Event { + case events.EventPipePaused: + p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 10158e74..0d15fb0f 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -66,29 +66,23 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Pause(req.GetPipelines()[i]) } - r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Resume(req.GetPipelines()[i]) } - r.p.Resume(pipelines) return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { resp.Pipelines = r.p.List() return nil } @@ -114,6 +108,24 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error return nil } +func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { + const op = errors.Op("rcp_declare_pipeline") + + var destroyed []string + for i := 0; i < len(req.GetPipelines()); i++ { + err := r.p.Destroy(req.GetPipelines()[i]) + if err != nil { + return errors.E(op, err) + } + destroyed = append(destroyed, req.GetPipelines()[i]) + } + + // return destroyed pipelines + resp.Pipelines = destroyed + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index 529cd972..9201f659 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -116,8 +116,8 @@ func (x *PushBatchRequest) GetJobs() []*Job { return nil } -// request to pause/resume/list -type Maintenance struct { +// request to pause/resume/list/Destroy +type Pipelines struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -125,8 +125,8 @@ type Maintenance struct { Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"` } -func (x *Maintenance) Reset() { - *x = Maintenance{} +func (x *Pipelines) Reset() { + *x = Pipelines{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -134,13 +134,13 @@ func (x *Maintenance) Reset() { } } -func (x *Maintenance) String() string { +func (x *Pipelines) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Maintenance) ProtoMessage() {} +func (*Pipelines) ProtoMessage() {} -func (x *Maintenance) ProtoReflect() protoreflect.Message { +func (x *Pipelines) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -152,12 +152,12 @@ func (x *Maintenance) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Maintenance.ProtoReflect.Descriptor instead. -func (*Maintenance) Descriptor() ([]byte, []int) { +// Deprecated: Use Pipelines.ProtoReflect.Descriptor instead. +func (*Pipelines) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{2} } -func (x *Maintenance) GetPipelines() []string { +func (x *Pipelines) GetPipelines() []string { if x != nil { return x.Pipelines } @@ -475,51 +475,51 @@ var file_jobs_proto_rawDesc = []byte{ 0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, - 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x2b, 0x0a, 0x0b, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65, - 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a, - 0x0e, 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x45, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, - 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50, - 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69, - 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, - 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, - 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, - 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, - 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, - 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, - 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, - 0x6c, 0x61, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, - 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, - 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, - 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, - 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, - 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x29, 0x0a, 0x09, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, + 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a, 0x0e, 0x44, + 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x45, 0x0a, + 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x44, 0x65, + 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, + 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, + 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, + 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, + 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, + 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, + 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, + 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, + 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, + 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, + 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -538,7 +538,7 @@ var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_jobs_proto_goTypes = []interface{}{ (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest - (*Maintenance)(nil), // 2: jobs.v1beta.Maintenance + (*Pipelines)(nil), // 2: jobs.v1beta.Pipelines (*Empty)(nil), // 3: jobs.v1beta.Empty (*DeclareRequest)(nil), // 4: jobs.v1beta.DeclareRequest (*Job)(nil), // 5: jobs.v1beta.Job @@ -592,7 +592,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Maintenance); i { + switch v := v.(*Pipelines); i { case 0: return &v.state case 1: diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 7a2bcd13..dc86e1c2 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -13,8 +13,8 @@ message PushBatchRequest { repeated Job jobs = 1; } -// request to pause/resume/list -message Maintenance { +// request to pause/resume/list/Destroy +message Pipelines { repeated string pipelines = 1; } diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml new file mode 100644 index 00000000..ed0345d6 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:guest@localhost:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml new file mode 100644 index 00000000..44e12d89 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml @@ -0,0 +1,54 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:guest@localhost:5672/ + +logs: + level: info + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: amqp + prefetch: 100 + queue: test-1-queue + priority: 1 + exchange: default + exchange_type: direct + routing_key: test-1 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + test-2: + driver: amqp + prefetch: 100 + queue: test-2-queue + priority: 2 + exchange: default + exchange_type: direct + routing_key: test-2 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go new file mode 100644 index 00000000..7e74891c --- /dev/null +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -0,0 +1,253 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestAMQPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", consumeAMQPPipe) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index c10ac350..0f9c2bb1 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -266,7 +266,7 @@ func ephemeralPause(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)} + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} pipe.GetPipelines()[0] = "test-local" er := &jobsv1beta.Empty{} @@ -279,7 +279,7 @@ func ephemeralResume(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)} + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} pipe.GetPipelines()[0] = "test-local" er := &jobsv1beta.Empty{} @@ -320,10 +320,14 @@ func pushToPipe(pipeline string) func(t *testing.T) { Job: "some/php/namespace", Id: "1", Payload: `{"hello":"world"}`, - Headers: nil, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, + Priority: 1, + Pipeline: pipeline, + Delay: 0, + Attempts: 0, + RetryDelay: 0, + Timeout: 0, }, }} @@ -332,3 +336,39 @@ func pushToPipe(pipeline string) func(t *testing.T) { assert.NoError(t, err) } } + +func pausePipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Pause", pipe, er) + assert.NoError(t, err) + } +} + +func destroyPipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Destroy", pipe, er) + assert.NoError(t, err) + } +} |