diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 13:53:19 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 13:53:19 +0300 |
commit | 05660fcd256963eac94ada90f7baa409344f9e73 (patch) | |
tree | 72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/plugin.go | |
parent | 182199a6449677a620813e3a8157cd0406095435 (diff) |
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 47d31d99..c8973f1e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,9 +3,7 @@ package jobs import ( "context" "fmt" - "runtime" "sync" - "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -24,11 +22,12 @@ import ( ) const ( - // RrJobs env variable - RrJobs string = "rr_jobs" - PluginName string = "jobs" + // RrMode env variable + RrMode string = "RR_MODE" + RrModeJobs string = "jobs" - pipelines string = "pipelines" + PluginName string = "jobs" + pipelines string = "pipelines" ) type Plugin struct { @@ -54,7 +53,10 @@ type Plugin struct { // initial set of the pipelines to consume consume map[string]struct{} + // signal channel to stop the pollers stopCh chan struct{} + + pldPool sync.Pool } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -79,6 +81,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) + p.pldPool = sync.Pool{New: func() interface{} { + // with nil fields + return payload.Payload{} + }} // initial set of pipelines for i := range p.cfg.Pipelines { @@ -98,6 +104,16 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } +func (p *Plugin) getPayload() payload.Payload { + return p.pldPool.Get().(payload.Payload) +} + +func (p *Plugin) putPayload(pld payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} + func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") @@ -161,29 +177,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit }) var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) if err != nil { errCh <- err return errCh } - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - var rate uint64 - go func() { - tt := time.NewTicker(time.Second * 1) - for { //nolint:gosimple - select { - case <-tt.C: - fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) - fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) - fmt.Printf("---> curr len: %d\n", p.queue.Len()) - atomic.StoreUint64(&rate, 0) - } - } - }() - - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - // start listening go func() { for i := uint8(0); i < p.cfg.NumPollers; i++ { @@ -194,9 +193,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.log.Debug("------> job poller stopped <------") return default: - // get data JOB from the queue + // get prioritized JOB from the queue jb := p.queue.ExtractMin() + // parse the context + // for the each job, context contains: + /* + 1. Job class + 2. Job ID provided from the outside + 3. Job Headers map[string][]string + 4. Timeout in seconds + 5. Pipeline name + */ ctx, err := jb.Context() if err != nil { errNack := jb.Nack() @@ -207,40 +215,44 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - exec := payload.Payload{ - Context: ctx, - Body: jb.Body(), - } - - // protect from the pool reset - p.RLock() + // get payload from the sync.Pool + exec := p.getPayload() + exec.Body = jb.Body() + exec.Context = ctx // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + // protect from the pool reset + p.RLock() resp, err := p.workersPool.Exec(exec) + p.RUnlock() if err != nil { errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } - p.RUnlock() p.log.Error("job execute", "error", err) + + p.putPayload(exec) continue } - p.RUnlock() // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) errAck := jb.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) + p.putPayload(exec) continue } - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) + + // return payload + p.putPayload(exec) } } }() @@ -301,7 +313,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) if err != nil { return errors.E(op, err) } |