diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 28 |
1 files changed, 5 insertions, 23 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 86289aba..c83078c3 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,9 +3,7 @@ package jobs import ( "context" "fmt" - "runtime" "sync" - "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -101,23 +99,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - var rate uint64 - go func() { - tt := time.NewTicker(time.Second * 1) - for { //nolint:gosimple - select { - case <-tt.C: - fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) - fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) - fmt.Printf("---> curr len: %d\n", p.queue.Len()) - atomic.StoreUint64(&rate, 0) - } - } - }() - - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { t := time.Now() @@ -234,9 +215,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } p.RUnlock() - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) - errAck := job.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) @@ -269,7 +247,7 @@ func (p *Plugin) Stop() error { } }() - // just wait pollers for 2 seconds before exit + // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) return nil @@ -470,6 +448,10 @@ func (p *Plugin) Destroy(pp string) error { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } + // delete consumer + delete(p.consumers, ppl.Name()) + p.pipelines.Delete(pp) + return d.Stop() } |