summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 20:09:01 +0300
committerGitHub <[email protected]>2021-09-02 20:09:01 +0300
commit6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch)
treef6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins/amqp/amqpjobs/consumer.go
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
parent4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff)
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins/amqp/amqpjobs/consumer.go')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go22
1 files changed, 15 insertions, 7 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..2ff0a40a 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("rabbit_run")
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ start := time.Now()
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
+
return nil
}