diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 14:28:05 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 14:28:05 +0300 |
commit | 05956485a121ee47a2f8281a8a0dffd7eecc68aa (patch) | |
tree | 1fd0ffd60c1731ea6934300506c3061f6e65d1d7 /plugins/jobs/plugin.go | |
parent | c7becb2fc51fc09523f6640eb72f360a6b4681f5 (diff) |
Add pipeline and job plugin options...
Skeleton for the amqp plugin.
Add Timeout and Pipeline to the job.Context() method.
Implement queue limits for the ephemeral driver with main priority queue
limits.
Update configuration, add pipeline_size for every pipeline and jobs
priority queue size.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 0f645b12..9a551d71 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -85,7 +85,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap(100_000_000) + p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log return nil @@ -102,7 +102,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> rate is: %d", atomic.LoadUint64(&rate)) + fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) atomic.StoreUint64(&rate, 0) } } @@ -113,7 +113,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> goroutines: %d", runtime.NumGoroutine()) + fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) } } }() @@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit for { //nolint:gosimple select { case <-tt.C: - fmt.Printf("---> curr len: %d", p.queue.Len()) + fmt.Printf("---> curr len: %d\n", p.queue.Len()) } } }() |