diff options
author | Valery Piashchynski <[email protected]> | 2021-07-12 14:57:24 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-12 14:57:24 +0300 |
commit | ec7c049036d31fe030d106db9f0d268ea0296c5f (patch) | |
tree | c7772e6e2c734c3df5ce207fb6690d5891b57c6d /plugins | |
parent | 0be3e79115345d13e92da9b8a4e2b0926480c138 (diff) |
Add JOBS tests to the Makefile and GitHub CI.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 9 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 28 |
2 files changed, 14 insertions, 23 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 481e102a..c2807b54 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -422,6 +422,15 @@ func (j *JobsConsumer) Resume(p string) { func (j *JobsConsumer) Stop() error { j.stopCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) return nil } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 86289aba..c83078c3 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,9 +3,7 @@ package jobs import ( "context" "fmt" - "runtime" "sync" - "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -101,23 +99,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - var rate uint64 - go func() { - tt := time.NewTicker(time.Second * 1) - for { //nolint:gosimple - select { - case <-tt.C: - fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) - fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) - fmt.Printf("---> curr len: %d\n", p.queue.Len()) - atomic.StoreUint64(&rate, 0) - } - } - }() - - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { t := time.Now() @@ -234,9 +215,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } p.RUnlock() - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) - errAck := job.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) @@ -269,7 +247,7 @@ func (p *Plugin) Stop() error { } }() - // just wait pollers for 2 seconds before exit + // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) return nil @@ -470,6 +448,10 @@ func (p *Plugin) Destroy(pp string) error { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } + // delete consumer + delete(p.consumers, ppl.Name()) + p.pipelines.Delete(pp) + return d.Stop() } |