summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go28
1 files changed, 5 insertions, 23 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 86289aba..c83078c3 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,9 +3,7 @@ package jobs
import (
"context"
"fmt"
- "runtime"
"sync"
- "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -101,23 +99,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
- var rate uint64
- go func() {
- tt := time.NewTicker(time.Second * 1)
- for { //nolint:gosimple
- select {
- case <-tt.C:
- fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
- fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
- fmt.Printf("---> curr len: %d\n", p.queue.Len())
- atomic.StoreUint64(&rate, 0)
- }
- }
- }()
-
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
-
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
t := time.Now()
@@ -234,9 +215,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
p.RUnlock()
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
-
errAck := job.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
@@ -269,7 +247,7 @@ func (p *Plugin) Stop() error {
}
}()
- // just wait pollers for 2 seconds before exit
+ // just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
return nil
@@ -470,6 +448,10 @@ func (p *Plugin) Destroy(pp string) error {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
+ // delete consumer
+ delete(p.consumers, ppl.Name())
+ p.pipelines.Delete(pp)
+
return d.Stop()
}