summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-12 14:57:24 +0300
committerValery Piashchynski <[email protected]>2021-07-12 14:57:24 +0300
commitec7c049036d31fe030d106db9f0d268ea0296c5f (patch)
treec7772e6e2c734c3df5ce207fb6690d5891b57c6d /plugins/jobs
parent0be3e79115345d13e92da9b8a4e2b0926480c138 (diff)
Add JOBS tests to the Makefile and GitHub CI.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go9
-rw-r--r--plugins/jobs/plugin.go28
2 files changed, 14 insertions, 23 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 481e102a..c2807b54 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -422,6 +422,15 @@ func (j *JobsConsumer) Resume(p string) {
func (j *JobsConsumer) Stop() error {
j.stopCh <- struct{}{}
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
return nil
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 86289aba..c83078c3 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,9 +3,7 @@ package jobs
import (
"context"
"fmt"
- "runtime"
"sync"
- "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -101,23 +99,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
- var rate uint64
- go func() {
- tt := time.NewTicker(time.Second * 1)
- for { //nolint:gosimple
- select {
- case <-tt.C:
- fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
- fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
- fmt.Printf("---> curr len: %d\n", p.queue.Len())
- atomic.StoreUint64(&rate, 0)
- }
- }
- }()
-
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
-
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
t := time.Now()
@@ -234,9 +215,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
p.RUnlock()
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
-
errAck := job.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
@@ -269,7 +247,7 @@ func (p *Plugin) Stop() error {
}
}()
- // just wait pollers for 2 seconds before exit
+ // just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
return nil
@@ -470,6 +448,10 @@ func (p *Plugin) Destroy(pp string) error {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
+ // delete consumer
+ delete(p.consumers, ppl.Name())
+ p.pipelines.Delete(pp)
+
return d.Stop()
}