summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-05 18:44:29 +0300
committerValery Piashchynski <[email protected]>2021-07-05 18:44:29 +0300
commit207739f7346c98e16087547bc510e1f909671260 (patch)
tree5c6eac27beb4eb5e127c7d8dae3464edb3359be9
parent300166eda7b138847008a7653f90753bd8397b9e (diff)
- Update PQ
- Update ephemeral plugin, complete Push - Add Jobs full configuration Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--.github/workflows/linux.yml2
-rwxr-xr-xMakefile8
-rw-r--r--common/priority_queue/interface.go15
-rw-r--r--pkg/priority_queue/binary_heap.go8
-rw-r--r--pkg/priority_queue/binary_heap_test.go21
-rw-r--r--pkg/priority_queue/interface.go11
-rw-r--r--pkg/priority_queue/pq.go18
-rw-r--r--pkg/priority_queue/pq_test.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go50
-rw-r--r--plugins/jobs/brokers/ephemeral/config.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go26
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/queue.go7
-rw-r--r--plugins/jobs/config.go12
-rw-r--r--plugins/jobs/interface.go8
-rw-r--r--plugins/jobs/pipeline/pipeline.go10
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go16
-rw-r--r--plugins/jobs/plugin.go68
-rw-r--r--plugins/jobs/structs/job.go20
-rw-r--r--plugins/jobs/structs/job_options.go7
-rw-r--r--plugins/jobs/structs/job_test.go4
-rw-r--r--tests/plugins/http/configs/.rr-http.yaml4
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml9
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go2
24 files changed, 184 insertions, 147 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 85ffce88..cf6c037b 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -77,6 +77,8 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/structs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
diff --git a/Makefile b/Makefile
index e0137d32..ae942e29 100755
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,11 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server_cmd.out -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server.out -covermode=atomic ./tests/plugins/server
@@ -47,9 +49,11 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
go test -v -race -tags=debug ./pkg/priority_queue
- go test -v -race -tags=debug ./tests/plugins/http
+ go test -v -race -tags=debug ./plugins/jobs/structs
+ go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./plugins/server
+ go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
go test -v -race -tags=debug ./tests/plugins/server
diff --git a/common/priority_queue/interface.go b/common/priority_queue/interface.go
new file mode 100644
index 00000000..c1774223
--- /dev/null
+++ b/common/priority_queue/interface.go
@@ -0,0 +1,15 @@
+package priorityqueue
+
+type Queue interface {
+ Insert(item Item)
+ GetMax() Item
+}
+
+type Item interface {
+ ID() string
+ Priority() uint64
+ Ack()
+ Nack()
+ Body() []byte
+ Context() []byte
+}
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index 02d413aa..c7c148da 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -4,7 +4,9 @@ binary heap (min-heap) algorithm used as a core for the priority queue
package priorityqueue
-type BinHeap []PQItem
+import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+
+type BinHeap []priorityqueue.Item
func NewBinHeap() *BinHeap {
return &BinHeap{}
@@ -53,12 +55,12 @@ func (bh *BinHeap) fixDown(curr, end int) {
}
}
-func (bh *BinHeap) Insert(item PQItem) {
+func (bh *BinHeap) Insert(item priorityqueue.Item) {
*bh = append(*bh, item)
bh.fixUp()
}
-func (bh *BinHeap) GetMax() PQItem {
+func (bh *BinHeap) GetMax() priorityqueue.Item {
l := len(*bh)
if l == 0 {
return nil
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index afeae62c..528e8fd0 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -3,11 +3,26 @@ package priorityqueue
import (
"testing"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/stretchr/testify/require"
)
type Test int
+func (t Test) Ack() {
+}
+
+func (t Test) Nack() {
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() []byte {
+ return nil
+}
+
func (t Test) ID() string {
return ""
}
@@ -17,7 +32,7 @@ func (t Test) Priority() uint64 {
}
func TestBinHeap_Init(t *testing.T) {
- a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+ a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
bh := NewBinHeap()
@@ -25,9 +40,9 @@ func TestBinHeap_Init(t *testing.T) {
bh.Insert(a[i])
}
- expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+ expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
- res := make([]PQItem, 0, 12)
+ res := make([]priorityqueue.Item, 0, 12)
for i := 0; i < 11; i++ {
item := bh.GetMax()
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
deleted file mode 100644
index 3cc1d575..00000000
--- a/pkg/priority_queue/interface.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package priorityqueue
-
-type Queue interface {
- Insert(item PQItem)
- GetMax() PQItem
-}
-
-type PQItem interface {
- ID() string
- Priority() uint64
-}
diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go
index 1b33cb92..2ff52a79 100644
--- a/pkg/priority_queue/pq.go
+++ b/pkg/priority_queue/pq.go
@@ -1,6 +1,10 @@
package priorityqueue
-import "sync"
+import (
+ "sync"
+
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+)
type PQ struct {
sync.RWMutex
@@ -13,14 +17,14 @@ func NewPriorityQueue() *PQ {
}
}
-func (p *PQ) Insert(item PQItem) {
+func (p *PQ) GetMax() priorityqueue.Item {
p.Lock()
- p.bh.Insert(item)
- p.Unlock()
+ defer p.Unlock()
+ return p.bh.GetMax()
}
-func (p *PQ) Get() PQItem {
+func (p *PQ) Insert(item priorityqueue.Item) {
p.Lock()
- defer p.Unlock()
- return p.bh.GetMax()
+ p.bh.Insert(item)
+ p.Unlock()
}
diff --git a/pkg/priority_queue/pq_test.go b/pkg/priority_queue/pq_test.go
index cdec10f5..49afe5e3 100644
--- a/pkg/priority_queue/pq_test.go
+++ b/pkg/priority_queue/pq_test.go
@@ -37,7 +37,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-stopCh:
return
default:
- it := pq.Get()
+ it := pq.GetMax()
if it == nil {
continue
}
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 4d357c34..3eb20c27 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -3,60 +3,68 @@ package ephemeral
import (
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
- jobs chan *entry
- queues map[*pipeline.Pipeline]*queue
+ queues map[string]bool
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- jobs: make(chan *entry, 10),
- pq: q,
+ queues: make(map[string]bool),
+ pq: q,
}
- go jb.serve()
-
return jb, nil
}
-func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
- id := uuid.NewString()
+func (j *JobBroker) Push(job *structs.Job) (string, error) {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.queues[job.Options.Pipeline]; ok {
+ if !b {
+ return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ }
+ if job.Options.Priority == nil {
+ job.Options.Priority = intPtr(10)
+ }
+ job.Options.ID = uuid.NewString()
+
+ j.pq.Insert(job)
- j.jobs <- &entry{
- id: id,
+ return job.Options.ID, nil
}
- return id, nil
+ return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Stat() {
panic("implement me")
}
-func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; !ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ if _, ok := j.queues[pipeline]; ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = newQueue()
+ j.queues[pipeline] = true
return nil
}
-func (j *JobBroker) serve() {
- for item := range j.jobs {
- // item should satisfy
- j.pq.Insert(item)
+func intPtr(val uint64) *uint64 {
+ if val == 0 {
+ val = 10
}
+ return &val
}
diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go
deleted file mode 100644
index 847b63ea..00000000
--- a/plugins/jobs/brokers/ephemeral/config.go
+++ /dev/null
@@ -1 +0,0 @@
-package ephemeral
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
deleted file mode 100644
index 3cedec3e..00000000
--- a/plugins/jobs/brokers/ephemeral/entry.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package ephemeral
-
-type entry struct {
- id string
- priority uint64
-}
-
-func (e *entry) ID() string {
- return e.id
-}
-
-func (e *entry) Priority() uint64 {
- return e.priority
-}
-
-func (e *entry) Ask() {
- // no-op
-}
-
-func (e *entry) Nack() {
- // no-op
-}
-
-func (e *entry) Payload() []byte {
- panic("implement me")
-}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 84cc871b..146d1fdc 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,7 +1,7 @@
package ephemeral
import (
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go
deleted file mode 100644
index 1c6d865b..00000000
--- a/plugins/jobs/brokers/ephemeral/queue.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package ephemeral
-
-type queue struct{}
-
-func newQueue() *queue {
- return &queue{}
-}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 87e36ecb..1cb2c2a2 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -9,19 +9,19 @@ import (
type Config struct {
// Workers configures roadrunner server and worker busy.
// Workers *roadrunner.ServerConfig
- poolCfg *poolImpl.Config
+ Pool *poolImpl.Config `mapstructure:"Pool"`
// Pipelines defines mapping between PHP job pipeline and associated job broker.
- Pipelines map[string]*pipeline.Pipeline
+ Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"`
// Consuming specifies names of pipelines to be consumed on service start.
- Consume []string
+ Consume []string `mapstructure:"consume"`
}
func (c *Config) InitDefaults() {
- if c.poolCfg == nil {
- c.poolCfg = &poolImpl.Config{}
+ if c.Pool == nil {
+ c.Pool = &poolImpl.Config{}
}
- c.poolCfg.InitDefaults()
+ c.Pool.InitDefaults()
}
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index a0aed50b..a2cf6ed9 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -1,17 +1,17 @@
package jobs
import (
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
// Consumer todo naming
type Consumer interface {
- Push(*pipeline.Pipeline, *structs.Job) (string, error)
+ Push(*structs.Job) (string, error)
Stat()
Consume(*pipeline.Pipeline)
- Register(*pipeline.Pipeline) error
+ Register(pipe string) error
}
type Broker interface {
@@ -20,7 +20,5 @@ type Broker interface {
type Item interface {
ID() string
- Ask()
- Nack()
Payload() []byte
}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index f27f6ede..987f6826 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -15,7 +15,7 @@ func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
out := make(Pipelines, 0)
for name, pipe := range pipes {
- if pipe.Broker() == "" {
+ if pipe.Driver() == "" {
return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
}
@@ -42,7 +42,7 @@ func (ps Pipelines) Broker(broker string) Pipelines {
out := make(Pipelines, 0)
for _, p := range ps {
- if p.Broker() != broker {
+ if p.Driver() != broker {
continue
}
@@ -98,9 +98,9 @@ func (p Pipeline) Name() string {
return p.String("name", "")
}
-// Broker associated with the pipeline.
-func (p Pipeline) Broker() string {
- return p.String("broker", "")
+// Driver associated with the pipeline.
+func (p Pipeline) Driver() string {
+ return p.String("driver", "")
}
// Has checks if value presented in pipeline.
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
index f03dcbb8..77acf96e 100644
--- a/plugins/jobs/pipeline/pipeline_test.go
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -58,23 +58,23 @@ func TestPipeline_Has(t *testing.T) {
func TestPipeline_FilterBroker(t *testing.T) {
pipes := Pipelines{
- &Pipeline{"name": "first", "broker": "a"},
- &Pipeline{"name": "second", "broker": "a"},
- &Pipeline{"name": "third", "broker": "b"},
- &Pipeline{"name": "forth", "broker": "b"},
+ &Pipeline{"name": "first", "driver": "a"},
+ &Pipeline{"name": "second", "driver": "a"},
+ &Pipeline{"name": "third", "driver": "b"},
+ &Pipeline{"name": "forth", "driver": "b"},
}
filtered := pipes.Names("first", "third")
assert.True(t, len(filtered) == 2)
- assert.Equal(t, "a", filtered[0].Broker())
- assert.Equal(t, "b", filtered[1].Broker())
+ assert.Equal(t, "a", filtered[0].Driver())
+ assert.Equal(t, "b", filtered[1].Driver())
filtered = pipes.Names("first", "third").Reverse()
assert.True(t, len(filtered) == 2)
- assert.Equal(t, "a", filtered[1].Broker())
- assert.Equal(t, "b", filtered[0].Broker())
+ assert.Equal(t, "a", filtered[1].Driver())
+ assert.Equal(t, "b", filtered[0].Driver())
filtered = pipes.Broker("a")
assert.True(t, len(filtered) == 2)
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6bf43a11..67077920 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,13 +2,14 @@ package jobs
import (
"context"
- "fmt"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -23,7 +24,7 @@ const (
)
type Plugin struct {
- cfg *Config
+ cfg *Config `mapstructure:"jobs"`
log logger.Logger
workersPool pool.Pool
@@ -41,10 +42,6 @@ type Plugin struct {
pipelines pipeline.Pipelines
}
-func testListener(data interface{}) {
- fmt.Println(data)
-}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
@@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.events.AddListener(testListener)
p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
@@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue2.NewPriorityQueue()
p.log = log
return nil
@@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ const op = errors.Op("jobs_plugin_serve")
for name := range p.brokers {
jb, err := p.brokers[name].InitJobBroker(p.queue)
@@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error {
p.consumers[name] = jb
}
+ // register initial pipelines
+ for i := 0; i < len(p.pipelines); i++ {
+ pipe := p.pipelines[i]
+
+ if jb, ok := p.consumers[pipe.Driver()]; ok {
+ err := jb.Register(pipe.Name())
+ if err != nil {
+ errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
+ return errCh
+ }
+ }
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
if err != nil {
errCh <- err
return errCh
}
- // initialize sub-plugins
- // provide a queue to them
- // start consume loop
- // start resp loop
+ // start listening
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
- /*
- go func() {
- for {
- // get data JOB from the queue
- job := p.queue.Pop()
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
- // request
- _ = job
- p.workersPool.Exec(nil)
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
}
- }()
+ }
+ }()
- */
return errCh
}
@@ -141,18 +155,16 @@ func (p *Plugin) Name() string {
func (p *Plugin) Push(j *structs.Job) (string, error) {
pipe := p.pipelines.Get(j.Options.Pipeline)
- broker, ok := p.consumers[pipe.Broker()]
+ broker, ok := p.consumers[pipe.Driver()]
if !ok {
panic("broker not found")
}
- id, err := broker.Push(pipe, j)
+ id, err := broker.Push(j)
if err != nil {
panic(err)
}
- // p.events.Push()
-
return id, nil
}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
index 2e394543..268444db 100644
--- a/plugins/jobs/structs/job.go
+++ b/plugins/jobs/structs/job.go
@@ -17,19 +17,35 @@ type Job struct {
Options *Options `json:"options,omitempty"`
}
+func (j *Job) ID() string {
+ return j.Options.ID
+}
+
+func (j *Job) Priority() uint64 {
+ return *j.Options.Priority
+}
+
// Body packs job payload into binary payload.
func (j *Job) Body() []byte {
return utils.AsBytes(j.Payload)
}
// Context packs job context (job, id) into binary payload.
-func (j *Job) Context(id string) []byte {
+func (j *Job) Context() []byte {
ctx, _ := json.Marshal(
struct {
ID string `json:"id"`
Job string `json:"job"`
- }{ID: id, Job: j.Job},
+ }{ID: j.Options.ID, Job: j.Job},
)
return ctx
}
+
+func (j *Job) Ack() {
+
+}
+
+func (j *Job) Nack() {
+
+}
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
index 1507d053..029a797d 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/structs/job_options.go
@@ -4,6 +4,13 @@ import "time"
// Options carry information about how to handle given job.
type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority *uint64 `json:"priority"`
+
+ // ID - generated ID for the job
+ ID string `json:"id"`
+
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
index e7240c6b..92f78081 100644
--- a/plugins/jobs/structs/job_test.go
+++ b/plugins/jobs/structs/job_test.go
@@ -13,7 +13,7 @@ func TestJob_Body(t *testing.T) {
}
func TestJob_Context(t *testing.T) {
- j := &Job{Job: "job"}
+ j := &Job{Job: "job", Options: &Options{ID: "id"}}
- assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context())
}
diff --git a/tests/plugins/http/configs/.rr-http.yaml b/tests/plugins/http/configs/.rr-http.yaml
index c95bc049..b4910160 100644
--- a/tests/plugins/http/configs/.rr-http.yaml
+++ b/tests/plugins/http/configs/.rr-http.yaml
@@ -3,10 +3,6 @@ rpc:
server:
command: "php ../../http/client.php echo pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 320f41b1..d86a8ad8 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -1,8 +1,8 @@
rpc:
- listen: tcp://127.0.0.1:6001
+ listen: unix:///home/valery/Downloads/rr.sock
server:
- command: "php ../../psr-worker-bench.php"
+ command: "php ../../client.php echo pipes"
relay: "pipes"
relay_timeout: "20s"
@@ -24,7 +24,10 @@ sqs:
jobs:
# worker pool configuration
pool:
- num_workers: 4
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
# list of broker pipelines associated with endpoints
pipelines:
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index e8b4e83d..754f60bc 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -82,7 +82,7 @@ func TestJobsInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 1)
+ time.Sleep(time.Second * 60)
stopCh <- struct{}{}