summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
committerValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
commit05660fcd256963eac94ada90f7baa409344f9e73 (patch)
tree72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/plugin.go
parent182199a6449677a620813e3a8157cd0406095435 (diff)
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go86
1 files changed, 49 insertions, 37 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 47d31d99..c8973f1e 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,9 +3,7 @@ package jobs
import (
"context"
"fmt"
- "runtime"
"sync"
- "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -24,11 +22,12 @@ import (
)
const (
- // RrJobs env variable
- RrJobs string = "rr_jobs"
- PluginName string = "jobs"
+ // RrMode env variable
+ RrMode string = "RR_MODE"
+ RrModeJobs string = "jobs"
- pipelines string = "pipelines"
+ PluginName string = "jobs"
+ pipelines string = "pipelines"
)
type Plugin struct {
@@ -54,7 +53,10 @@ type Plugin struct {
// initial set of the pipelines to consume
consume map[string]struct{}
+ // signal channel to stop the pollers
stopCh chan struct{}
+
+ pldPool sync.Pool
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -79,6 +81,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
+ p.pldPool = sync.Pool{New: func() interface{} {
+ // with nil fields
+ return payload.Payload{}
+ }}
// initial set of pipelines
for i := range p.cfg.Pipelines {
@@ -98,6 +104,16 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
+func (p *Plugin) getPayload() payload.Payload {
+ return p.pldPool.Get().(payload.Payload)
+}
+
+func (p *Plugin) putPayload(pld payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}
+
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
@@ -161,29 +177,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
})
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
if err != nil {
errCh <- err
return errCh
}
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
- var rate uint64
- go func() {
- tt := time.NewTicker(time.Second * 1)
- for { //nolint:gosimple
- select {
- case <-tt.C:
- fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
- fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
- fmt.Printf("---> curr len: %d\n", p.queue.Len())
- atomic.StoreUint64(&rate, 0)
- }
- }
- }()
-
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
-
// start listening
go func() {
for i := uint8(0); i < p.cfg.NumPollers; i++ {
@@ -194,9 +193,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.log.Debug("------> job poller stopped <------")
return
default:
- // get data JOB from the queue
+ // get prioritized JOB from the queue
jb := p.queue.ExtractMin()
+ // parse the context
+ // for the each job, context contains:
+ /*
+ 1. Job class
+ 2. Job ID provided from the outside
+ 3. Job Headers map[string][]string
+ 4. Timeout in seconds
+ 5. Pipeline name
+ */
ctx, err := jb.Context()
if err != nil {
errNack := jb.Nack()
@@ -207,40 +215,44 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
- exec := payload.Payload{
- Context: ctx,
- Body: jb.Body(),
- }
-
- // protect from the pool reset
- p.RLock()
+ // get payload from the sync.Pool
+ exec := p.getPayload()
+ exec.Body = jb.Body()
+ exec.Context = ctx
// TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ // remove in tests
p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+ // protect from the pool reset
+ p.RLock()
resp, err := p.workersPool.Exec(exec)
+ p.RUnlock()
if err != nil {
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
- p.RUnlock()
p.log.Error("job execute", "error", err)
+
+ p.putPayload(exec)
continue
}
- p.RUnlock()
// TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ // remove in tests
p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
errAck := jb.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
+ p.putPayload(exec)
continue
}
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
+
+ // return payload
+ p.putPayload(exec)
}
}
}()
@@ -301,7 +313,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
if err != nil {
return errors.E(op, err)
}