summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 20:59:00 +0300
committerValery Piashchynski <[email protected]>2021-07-14 20:59:00 +0300
commit7ea227733e0b1fa59021233e6cd0fd06442fbe50 (patch)
treedcba3e2fddf53898d0f5c82eb128730ab38c723a /plugins/jobs/drivers/amqp/consumer.go
parent04fde6d8d1a5a88602f8206e0d2c09c4b8346941 (diff)
Fix incorrect path in the CI. Implement FromPipeline for the sqs.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go43
1 files changed, 41 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 31999e23..20dcef2a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -43,7 +43,8 @@ type JobsConsumer struct {
delayCache map[string]struct{}
- stopCh chan struct{}
+ listeners uint32
+ stopCh chan struct{}
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -336,6 +337,13 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
// run listener
j.listener(deliv)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
@@ -345,6 +353,15 @@ func (j *JobsConsumer) Pause(p string) {
j.log.Error("no such pipeline", "requested pause on: ", p)
}
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
// protect connection (redial)
j.Lock()
defer j.Unlock()
@@ -355,8 +372,17 @@ func (j *JobsConsumer) Pause(p string) {
errCl := j.consumeChan.Close()
if errCl != nil {
j.log.Error("force close failed", "error", err)
+ return
}
+ return
}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (j *JobsConsumer) Resume(p string) {
@@ -369,6 +395,13 @@ func (j *JobsConsumer) Resume(p string) {
j.Lock()
defer j.Unlock()
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {
@@ -399,6 +432,13 @@ func (j *JobsConsumer) Resume(p string) {
// run listener
j.listener(deliv)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (j *JobsConsumer) Stop() error {
@@ -410,7 +450,6 @@ func (j *JobsConsumer) Stop() error {
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
- Elapsed: 0,
})
return nil
}