diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 20:59:00 +0300 |
commit | 7ea227733e0b1fa59021233e6cd0fd06442fbe50 (patch) | |
tree | dcba3e2fddf53898d0f5c82eb128730ab38c723a /plugins/jobs/drivers/amqp/consumer.go | |
parent | 04fde6d8d1a5a88602f8206e0d2c09c4b8346941 (diff) |
Fix incorrect path in the CI. Implement FromPipeline for the sqs.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 31999e23..20dcef2a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -43,7 +43,8 @@ type JobsConsumer struct { delayCache map[string]struct{} - stopCh chan struct{} + listeners uint32 + stopCh chan struct{} } // NewAMQPConsumer initializes rabbitmq pipeline @@ -336,6 +337,13 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { // run listener j.listener(deliv) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } @@ -345,6 +353,15 @@ func (j *JobsConsumer) Pause(p string) { j.log.Error("no such pipeline", "requested pause on: ", p) } + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + // protect connection (redial) j.Lock() defer j.Unlock() @@ -355,8 +372,17 @@ func (j *JobsConsumer) Pause(p string) { errCl := j.consumeChan.Close() if errCl != nil { j.log.Error("force close failed", "error", err) + return } + return } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobsConsumer) Resume(p string) { @@ -369,6 +395,13 @@ func (j *JobsConsumer) Resume(p string) { j.Lock() defer j.Unlock() + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + var err error j.consumeChan, err = j.conn.Channel() if err != nil { @@ -399,6 +432,13 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (j *JobsConsumer) Stop() error { @@ -410,7 +450,6 @@ func (j *JobsConsumer) Stop() error { Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), - Elapsed: 0, }) return nil } |