summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linux.yml2
-rwxr-xr-xMakefile5
-rw-r--r--common/jobs/interface.go26
-rw-r--r--pkg/priority_queue/binary_heap.go75
-rw-r--r--pkg/priority_queue/binary_heap_test.go53
-rw-r--r--pkg/priority_queue/pq.go30
-rw-r--r--pkg/priorityqueue/binary_heap.go103
-rw-r--r--pkg/priorityqueue/binary_heap_test.go (renamed from pkg/priority_queue/pq_test.go)50
-rw-r--r--pkg/priorityqueue/interface.go (renamed from common/priority_queue/interface.go)8
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go82
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go6
-rw-r--r--plugins/jobs/config.go13
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go24
-rw-r--r--plugins/jobs/plugin.go68
-rw-r--r--plugins/jobs/rpc.go95
-rw-r--r--plugins/jobs/structs/job.go10
-rw-r--r--plugins/jobs/structs/job_options.go12
-rw-r--r--plugins/jobs/structs/job_options_test.go16
-rw-r--r--plugins/jobs/structs/job_test.go3
-rw-r--r--proto/jobs/v1beta/jobs.pb.go476
-rw-r--r--proto/jobs/v1beta/jobs.proto36
-rw-r--r--proto/kv/v1beta/kv.pb.go9
-rw-r--r--proto/websockets/v1beta/websockets.pb.go9
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml1
-rw-r--r--utils/pointers.go15
26 files changed, 935 insertions, 294 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index cf6c037b..4679cc24 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -73,7 +73,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priorityqueue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
diff --git a/Makefile b/Makefile
index ae942e29..625c6788 100755
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priorityqueue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config
@@ -48,7 +48,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
- go test -v -race -tags=debug ./pkg/priority_queue
+ go test -v -race -tags=debug ./pkg/priorityqueue
go test -v -race -tags=debug ./plugins/jobs/structs
go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
@@ -81,6 +81,7 @@ testGo1.17beta1: ## Run application tests
go1.17beta1 test -v -race -tags=debug ./pkg/worker
go1.17beta1 test -v -race -tags=debug ./pkg/worker_watcher
go1.17beta1 test -v -race -tags=debug ./pkg/bst
+ go1.17beta1 test -v -race -tags=debug ./pkg/priorityqueue
go1.17beta1 test -v -race -tags=debug ./tests/plugins/http
go1.17beta1 test -v -race -tags=debug ./plugins/http/config
go1.17beta1 test -v -race -tags=debug ./plugins/server
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
new file mode 100644
index 00000000..6738ed46
--- /dev/null
+++ b/common/jobs/interface.go
@@ -0,0 +1,26 @@
+package jobs
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Consumer todo naming
+type Consumer interface {
+ Push(job *structs.Job) (*string, error)
+ PushBatch(job *[]structs.Job) (*string, error)
+ Consume(job *pipeline.Pipeline)
+
+ Stop(pipeline string)
+ StopAll()
+ Resume(pipeline string)
+ ResumeAll()
+
+ Register(pipe string) error
+ Stat()
+}
+
+type Constructor interface {
+ JobsConstruct(configKey string, queue priorityqueue.Queue) (Consumer, error)
+}
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
deleted file mode 100644
index c7c148da..00000000
--- a/pkg/priority_queue/binary_heap.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-binary heap (min-heap) algorithm used as a core for the priority queue
-*/
-
-package priorityqueue
-
-import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
-
-type BinHeap []priorityqueue.Item
-
-func NewBinHeap() *BinHeap {
- return &BinHeap{}
-}
-
-func (bh *BinHeap) fixUp() {
- k := len(*bh) - 1
- p := (k - 1) >> 1 // k-1 / 2
-
- for k > 0 {
- cur, par := (*bh)[k], (*bh)[p]
-
- if cur.Priority() < par.Priority() {
- bh.swap(k, p)
- k = p
- p = (k - 1) >> 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) swap(i, j int) {
- (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i]
-}
-
-func (bh *BinHeap) fixDown(curr, end int) {
- cOneIdx := curr*2 + 1
- for cOneIdx <= end {
- cTwoIdx := -1
- if curr*2+2 <= end {
- cTwoIdx = curr*2 + 2
- }
-
- idxToSwap := cOneIdx
- if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() {
- idxToSwap = cTwoIdx
- }
- if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() {
- bh.swap(curr, idxToSwap)
- curr = idxToSwap
- cOneIdx = curr*2 + 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) Insert(item priorityqueue.Item) {
- *bh = append(*bh, item)
- bh.fixUp()
-}
-
-func (bh *BinHeap) GetMax() priorityqueue.Item {
- l := len(*bh)
- if l == 0 {
- return nil
- }
-
- bh.swap(0, l-1)
-
- item := (*bh)[l-1]
- *bh = (*bh)[0 : l-1]
- bh.fixDown(0, l-2)
- return item
-}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
deleted file mode 100644
index 528e8fd0..00000000
--- a/pkg/priority_queue/binary_heap_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package priorityqueue
-
-import (
- "testing"
-
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/stretchr/testify/require"
-)
-
-type Test int
-
-func (t Test) Ack() {
-}
-
-func (t Test) Nack() {
-}
-
-func (t Test) Body() []byte {
- return nil
-}
-
-func (t Test) Context() []byte {
- return nil
-}
-
-func (t Test) ID() string {
- return ""
-}
-
-func (t Test) Priority() uint64 {
- return uint64(t)
-}
-
-func TestBinHeap_Init(t *testing.T) {
- a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap()
-
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
-
- res := make([]priorityqueue.Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.GetMax()
- res = append(res, item)
- }
-
- require.Equal(t, expected, res)
-}
diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go
deleted file mode 100644
index 2ff52a79..00000000
--- a/pkg/priority_queue/pq.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package priorityqueue
-
-import (
- "sync"
-
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
-)
-
-type PQ struct {
- sync.RWMutex
- bh *BinHeap
-}
-
-func NewPriorityQueue() *PQ {
- return &PQ{
- bh: NewBinHeap(),
- }
-}
-
-func (p *PQ) GetMax() priorityqueue.Item {
- p.Lock()
- defer p.Unlock()
- return p.bh.GetMax()
-}
-
-func (p *PQ) Insert(item priorityqueue.Item) {
- p.Lock()
- p.bh.Insert(item)
- p.Unlock()
-}
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
new file mode 100644
index 00000000..fe6a06fd
--- /dev/null
+++ b/pkg/priorityqueue/binary_heap.go
@@ -0,0 +1,103 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type BinHeap struct {
+ items []Item
+ // find a way to use pointer to the raw data
+ len uint64
+ cond sync.Cond
+}
+
+func NewBinHeap() *BinHeap {
+ return &BinHeap{
+ items: make([]Item, 0, 100),
+ len: 0,
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (bh *BinHeap) fixUp() {
+ k := len(bh.items) - 1
+ p := (k - 1) >> 1 // k-1 / 2
+
+ for k > 0 {
+ cur, par := (bh.items)[k], (bh.items)[p]
+
+ if *cur.Priority() < *par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) swap(i, j int) {
+ (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := (curr << 1) + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if (curr<<1)+2 <= end {
+ cTwoIdx = (curr << 1) + 2
+ }
+
+ idxToSwap := cOneIdx
+ // oh my, so unsafe
+ if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() {
+ bh.swap(curr, idxToSwap)
+ curr = idxToSwap
+ cOneIdx = (curr << 1) + 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) Insert(item Item) {
+ bh.cond.L.Lock()
+ bh.items = append(bh.items, item)
+
+ // add len to the slice
+ atomic.AddUint64(&bh.len, 1)
+
+ // fix binary heap up
+ bh.fixUp()
+ bh.cond.L.Unlock()
+
+ // signal the goroutine on wait
+ bh.cond.Signal()
+}
+
+func (bh *BinHeap) GetMax() Item {
+ bh.cond.L.Lock()
+ defer bh.cond.L.Unlock()
+
+ for atomic.LoadUint64(&bh.len) == 0 {
+ bh.cond.Wait()
+ }
+
+ bh.swap(0, int(bh.len-1))
+
+ item := (bh.items)[int(bh.len)-1]
+ bh.items = (bh).items[0 : int(bh.len)-1]
+ bh.fixDown(0, int(bh.len-2))
+
+ // reduce len
+ atomic.AddUint64(&bh.len, ^uint64(0))
+ return item
+}
diff --git a/pkg/priority_queue/pq_test.go b/pkg/priorityqueue/binary_heap_test.go
index 49afe5e3..149ec764 100644
--- a/pkg/priority_queue/pq_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -6,13 +6,61 @@ import (
"sync/atomic"
"testing"
"time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/stretchr/testify/require"
)
+type Test int
+
+func (t Test) Ack() {
+}
+
+func (t Test) Nack() {
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() []byte {
+ return nil
+}
+
+func (t Test) ID() *string {
+ return utils.AsStringPtr("none")
+}
+
+func (t Test) Priority() *uint64 {
+ return utils.AsUint64Ptr(uint64(t))
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap()
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]Item, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.GetMax()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewPriorityQueue()
+ pq := NewBinHeap()
go func() {
tt := time.NewTicker(time.Second)
diff --git a/common/priority_queue/interface.go b/pkg/priorityqueue/interface.go
index c1774223..7ac2e449 100644
--- a/common/priority_queue/interface.go
+++ b/pkg/priorityqueue/interface.go
@@ -6,10 +6,10 @@ type Queue interface {
}
type Item interface {
- ID() string
- Priority() uint64
- Ack()
- Nack()
+ ID() *string
+ Priority() *uint64
Body() []byte
Context() []byte
+ Ack()
+ Nack()
}
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 3eb20c27..4bbb4095 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,70 +1,104 @@
package ephemeral
import (
+ "sync"
+
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobBroker struct {
- queues map[string]bool
+ queues sync.Map
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- queues: make(map[string]bool),
+ queues: sync.Map{},
pq: q,
}
return jb, nil
}
-func (j *JobBroker) Push(job *structs.Job) (string, error) {
+func (j *JobBroker) Push(job *structs.Job) (*string, error) {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.queues[job.Options.Pipeline]; ok {
- if !b {
- return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ if b, ok := j.queues.Load(job.Options.Pipeline); ok {
+ if !b.(bool) {
+ return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
}
if job.Options.Priority == nil {
- job.Options.Priority = intPtr(10)
+ job.Options.Priority = utils.AsUint64Ptr(10)
}
- job.Options.ID = uuid.NewString()
+ job.Options.ID = utils.AsStringPtr(uuid.NewString())
j.pq.Insert(job)
return job.Options.ID, nil
}
- return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
-}
-
-func (j *JobBroker) Stat() {
- panic("implement me")
-}
-
-func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
- panic("implement me")
+ return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; ok {
+ if _, ok := j.queues.Load(pipeline); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = true
+ j.queues.Store(pipeline, true)
return nil
}
-func intPtr(val uint64) *uint64 {
- if val == 0 {
- val = 10
+func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) {
+ // Use a batch response
+ // Add JobID to the payload to match responses
+ panic("todo")
+}
+
+func (j *JobBroker) Stop(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, false)
+ }
+ }
+}
+
+func (j *JobBroker) StopAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, false)
+ return true
+ })
+}
+
+func (j *JobBroker) Resume(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, true)
+ }
}
- return &val
+}
+
+func (j *JobBroker) ResumeAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, true)
+ return true
+ })
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
+ panic("implement me")
}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 146d1fdc..3d6a95b7 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,8 +1,8 @@
package ephemeral
import (
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -23,6 +23,6 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) {
+func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) {
return NewJobBroker(q)
}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 1cb2c2a2..07e2ef38 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -1,14 +1,19 @@
package jobs
import (
+ "runtime"
+
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
)
// Config defines settings for job broker, workers and job-pipeline mapping.
type Config struct {
- // Workers configures roadrunner server and worker busy.
- // Workers *roadrunner.ServerConfig
+ // NumPollers configures number of priority queue pollers
+ // Should be no more than 255
+ // Default - num logical cores
+ NumPollers uint8 `mapstructure:"num_pollers"`
+ // Pool configures roadrunner workers pool.
Pool *poolImpl.Config `mapstructure:"Pool"`
// Pipelines defines mapping between PHP job pipeline and associated job broker.
@@ -23,5 +28,9 @@ func (c *Config) InitDefaults() {
c.Pool = &poolImpl.Config{}
}
+ if c.NumPollers == 0 {
+ c.NumPollers = uint8(runtime.NumCPU())
+ }
+
c.Pool.InitDefaults()
}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index 4161d7ca..56a8839d 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
deleted file mode 100644
index a2cf6ed9..00000000
--- a/plugins/jobs/interface.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package jobs
-
-import (
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
-)
-
-// Consumer todo naming
-type Consumer interface {
- Push(*structs.Job) (string, error)
- Stat()
- Consume(*pipeline.Pipeline)
- Register(pipe string) error
-}
-
-type Broker interface {
- InitJobBroker(queue priorityqueue.Queue) (Consumer, error)
-}
-
-type Item interface {
- ID() string
- Payload() []byte
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 67077920..8a80479b 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,14 +2,15 @@ package jobs
import (
"context"
+ "sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -27,11 +28,13 @@ type Plugin struct {
cfg *Config `mapstructure:"jobs"`
log logger.Logger
+ sync.RWMutex
+
workersPool pool.Pool
server server.Server
- brokers map[string]Broker
- consumers map[string]Consumer
+ jobConstructors map[string]jobs.Constructor
+ consumers map[string]jobs.Consumer
events events.Handler
@@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.brokers = make(map[string]Broker)
- p.consumers = make(map[string]Consumer)
+ p.jobConstructors = make(map[string]jobs.Constructor)
+ p.consumers = make(map[string]jobs.Consumer)
// initial set of pipelines
p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
@@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue2.NewPriorityQueue()
+ p.queue = priorityqueue.NewBinHeap()
p.log = log
return nil
@@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- for name := range p.brokers {
- jb, err := p.brokers[name].InitJobBroker(p.queue)
+ for name := range p.jobConstructors {
+ jb, err := p.jobConstructors[name].JobsConstruct("", p.queue)
if err != nil {
errCh <- err
return errCh
@@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error {
// start listening
go func() {
- for {
- // get data JOB from the queue
- job := p.queue.GetMax()
-
- if job == nil {
- continue
- }
-
- exec := payload.Payload{
- Context: job.Context(),
- Body: job.Body(),
- }
-
- _, err = p.workersPool.Exec(exec)
- if err != nil {
- panic(err)
- }
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
+
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
+
+ _, err := p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
+ }
+ }
+ }()
}
}()
@@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
- p.brokers[name.Name()] = c
+func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
+ p.jobConstructors[name.Name()] = c
}
func (p *Plugin) Available() {}
@@ -152,12 +159,13 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Push(j *structs.Job) (string, error) {
+func (p *Plugin) Push(j *structs.Job) (*string, error) {
+ const op = errors.Op("jobs_plugin_push")
pipe := p.pipelines.Get(j.Options.Pipeline)
broker, ok := p.consumers[pipe.Driver()]
if !ok {
- panic("broker not found")
+ return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
}
id, err := broker.Push(j)
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index e77cda59..c6bd1645 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,8 +1,12 @@
package jobs
import (
+ "sync"
+
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
type rpc struct {
@@ -10,11 +14,96 @@ type rpc struct {
p *Plugin
}
-func (r *rpc) Push(j *structs.Job, idRet *string) error {
+var jobsPool = &sync.Pool{
+ New: func() interface{} {
+ return &structs.Job{
+ Options: &structs.Options{},
+ }
+ },
+}
+
+func pubJob(j *structs.Job) {
+ // clear
+ j.Job = ""
+ j.Payload = ""
+ j.Options = &structs.Options{}
+ jobsPool.Put(j)
+}
+
+func getJob() *structs.Job {
+ return jobsPool.Get().(*structs.Job)
+}
+
+/*
+List of the RPC methods:
+1. Push - single job push
+2. PushBatch - push job batch
+
+3. Reset - managed by the Resetter plugin
+
+4. Stop - stop pipeline processing
+5. StopAll - stop all pipelines processing
+6. Resume - resume pipeline processing
+7. ResumeAll - resume stopped pipelines
+
+8. Workers - managed by the Informer plugin.
+9. Stat - jobs statistic
+*/
+
+func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ // convert transport entity into domain
+ // how we can do this quickly
+ jb := getJob()
+ defer pubJob(jb)
+
+ jb = &structs.Job{
+ Job: j.GetJob().Job,
+ Payload: j.GetJob().Payload,
+ Options: &structs.Options{
+ Priority: &j.GetJob().Options.Priority,
+ ID: &j.GetJob().Options.Id,
+ Pipeline: j.GetJob().Options.Pipeline,
+ Delay: j.GetJob().Options.Delay,
+ Attempts: j.GetJob().Options.Attempts,
+ RetryDelay: j.GetJob().Options.RetryDelay,
+ Timeout: j.GetJob().Options.Timeout,
+ },
+ }
+ id, err := r.p.Push(jb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ resp.Id = *id
+
+ return nil
+}
+
+func (r *rpc) PushBatch(j *structs.Job, idRet *string) error {
+ const op = errors.Op("jobs_rpc_push")
id, err := r.p.Push(j)
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
- *idRet = id
+
+ *idRet = *id
+ return nil
+}
+
+func (r *rpc) Stop(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) StopAll(_ bool, w *string) error {
+ return nil
+}
+
+func (r *rpc) Resume(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) ResumeAll(_ bool, w *string) error {
return nil
}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
index 268444db..1ef4d2ca 100644
--- a/plugins/jobs/structs/job.go
+++ b/plugins/jobs/structs/job.go
@@ -17,12 +17,12 @@ type Job struct {
Options *Options `json:"options,omitempty"`
}
-func (j *Job) ID() string {
+func (j *Job) ID() *string {
return j.Options.ID
}
-func (j *Job) Priority() uint64 {
- return *j.Options.Priority
+func (j *Job) Priority() *uint64 {
+ return j.Options.Priority
}
// Body packs job payload into binary payload.
@@ -34,8 +34,8 @@ func (j *Job) Body() []byte {
func (j *Job) Context() []byte {
ctx, _ := json.Marshal(
struct {
- ID string `json:"id"`
- Job string `json:"job"`
+ ID *string `json:"id"`
+ Job string `json:"job"`
}{ID: j.Options.ID, Job: j.Job},
)
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
index 029a797d..3e1ada85 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/structs/job_options.go
@@ -9,23 +9,23 @@ type Options struct {
Priority *uint64 `json:"priority"`
// ID - generated ID for the job
- ID string `json:"id"`
+ ID *string `json:"id"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int `json:"delay,omitempty"`
+ Delay uint64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts int `json:"maxAttempts,omitempty"`
+ Attempts uint64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int `json:"retryDelay,omitempty"`
+ RetryDelay uint64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int `json:"timeout,omitempty"`
+ Timeout uint64 `json:"timeout,omitempty"`
}
// Merge merges job options.
@@ -52,7 +52,7 @@ func (o *Options) Merge(from *Options) {
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int) bool {
+func (o *Options) CanRetry(attempt uint64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go
index 18702394..a16f7dd0 100644
--- a/plugins/jobs/structs/job_options_test.go
+++ b/plugins/jobs/structs/job_options_test.go
@@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) {
})
assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, 1, opts.Attempts)
- assert.Equal(t, 2, opts.Delay)
- assert.Equal(t, 1, opts.Timeout)
- assert.Equal(t, 1, opts.RetryDelay)
+ assert.Equal(t, uint64(1), opts.Attempts)
+ assert.Equal(t, uint64(2), opts.Delay)
+ assert.Equal(t, uint64(1), opts.Timeout)
+ assert.Equal(t, uint64(1), opts.RetryDelay)
}
func TestOptions_MergeKeepOriginal(t *testing.T) {
@@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) {
})
assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, 10, opts.Attempts)
- assert.Equal(t, 10, opts.Delay)
- assert.Equal(t, 10, opts.Timeout)
- assert.Equal(t, 10, opts.RetryDelay)
+ assert.Equal(t, uint64(10), opts.Attempts)
+ assert.Equal(t, uint64(10), opts.Delay)
+ assert.Equal(t, uint64(10), opts.Timeout)
+ assert.Equal(t, uint64(10), opts.RetryDelay)
}
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
index 92f78081..0aa5b177 100644
--- a/plugins/jobs/structs/job_test.go
+++ b/plugins/jobs/structs/job_test.go
@@ -3,6 +3,7 @@ package structs
import (
"testing"
+ "github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/assert"
)
@@ -13,7 +14,7 @@ func TestJob_Body(t *testing.T) {
}
func TestJob_Context(t *testing.T) {
- j := &Job{Job: "job", Options: &Options{ID: "id"}}
+ j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}}
assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context())
}
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
new file mode 100644
index 00000000..9d8427be
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -0,0 +1,476 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
+// source: jobs.proto
+
+package jobsv1beta
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// single job request
+type Request struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"`
+}
+
+func (x *Request) Reset() {
+ *x = Request{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Request) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Request) ProtoMessage() {}
+
+func (x *Request) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Request.ProtoReflect.Descriptor instead.
+func (*Request) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Request) GetJob() *Job {
+ if x != nil {
+ return x.Job
+ }
+ return nil
+}
+
+// batch jobs request
+type BatchRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"`
+}
+
+func (x *BatchRequest) Reset() {
+ *x = BatchRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *BatchRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BatchRequest) ProtoMessage() {}
+
+func (x *BatchRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use BatchRequest.ProtoReflect.Descriptor instead.
+func (*BatchRequest) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *BatchRequest) GetJobs() []*Job {
+ if x != nil {
+ return x.Jobs
+ }
+ return nil
+}
+
+// RPC response
+type Response struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+}
+
+func (x *Response) Reset() {
+ *x = Response{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Response) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Response) ProtoMessage() {}
+
+func (x *Response) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Response.ProtoReflect.Descriptor instead.
+func (*Response) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *Response) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+type Job struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"`
+ Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
+ Options *Options `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
+}
+
+func (x *Job) Reset() {
+ *x = Job{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Job) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Job) ProtoMessage() {}
+
+func (x *Job) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Job.ProtoReflect.Descriptor instead.
+func (*Job) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *Job) GetJob() string {
+ if x != nil {
+ return x.Job
+ }
+ return ""
+}
+
+func (x *Job) GetPayload() string {
+ if x != nil {
+ return x.Payload
+ }
+ return ""
+}
+
+func (x *Job) GetOptions() *Options {
+ if x != nil {
+ return x.Options
+ }
+ return nil
+}
+
+type Options struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"`
+ Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
+ Pipeline string `protobuf:"bytes,3,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
+ Delay uint64 `protobuf:"varint,4,opt,name=delay,proto3" json:"delay,omitempty"`
+ Attempts uint64 `protobuf:"varint,5,opt,name=attempts,proto3" json:"attempts,omitempty"`
+ RetryDelay uint64 `protobuf:"varint,6,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"`
+ Timeout uint64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"`
+}
+
+func (x *Options) Reset() {
+ *x = Options{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Options) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Options) ProtoMessage() {}
+
+func (x *Options) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Options.ProtoReflect.Descriptor instead.
+func (*Options) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *Options) GetPriority() uint64 {
+ if x != nil {
+ return x.Priority
+ }
+ return 0
+}
+
+func (x *Options) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *Options) GetPipeline() string {
+ if x != nil {
+ return x.Pipeline
+ }
+ return ""
+}
+
+func (x *Options) GetDelay() uint64 {
+ if x != nil {
+ return x.Delay
+ }
+ return 0
+}
+
+func (x *Options) GetAttempts() uint64 {
+ if x != nil {
+ return x.Attempts
+ }
+ return 0
+}
+
+func (x *Options) GetRetryDelay() uint64 {
+ if x != nil {
+ return x.RetryDelay
+ }
+ return 0
+}
+
+func (x *Options) GetTimeout() uint64 {
+ if x != nil {
+ return x.Timeout
+ }
+ return 0
+}
+
+var File_jobs_proto protoreflect.FileDescriptor
+
+var file_jobs_proto_rawDesc = []byte{
+ 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f,
+ 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
+ 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x34, 0x0a, 0x0c, 0x42, 0x61, 0x74, 0x63,
+ 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73,
+ 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31,
+ 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x1a,
+ 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x61, 0x0a, 0x03, 0x4a, 0x6f,
+ 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
+ 0x6a, 0x6f, 0x62, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a,
+ 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14,
+ 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74,
+ 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xbe, 0x01,
+ 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69,
+ 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x70, 0x72, 0x69,
+ 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e,
+ 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e,
+ 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04,
+ 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d,
+ 0x70, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d,
+ 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c,
+ 0x61, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44,
+ 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18,
+ 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f,
+ 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_jobs_proto_rawDescOnce sync.Once
+ file_jobs_proto_rawDescData = file_jobs_proto_rawDesc
+)
+
+func file_jobs_proto_rawDescGZIP() []byte {
+ file_jobs_proto_rawDescOnce.Do(func() {
+ file_jobs_proto_rawDescData = protoimpl.X.CompressGZIP(file_jobs_proto_rawDescData)
+ })
+ return file_jobs_proto_rawDescData
+}
+
+var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_jobs_proto_goTypes = []interface{}{
+ (*Request)(nil), // 0: jobs.v1beta.Request
+ (*BatchRequest)(nil), // 1: jobs.v1beta.BatchRequest
+ (*Response)(nil), // 2: jobs.v1beta.Response
+ (*Job)(nil), // 3: jobs.v1beta.Job
+ (*Options)(nil), // 4: jobs.v1beta.Options
+}
+var file_jobs_proto_depIdxs = []int32{
+ 3, // 0: jobs.v1beta.Request.job:type_name -> jobs.v1beta.Job
+ 3, // 1: jobs.v1beta.BatchRequest.jobs:type_name -> jobs.v1beta.Job
+ 4, // 2: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
+ 3, // [3:3] is the sub-list for method output_type
+ 3, // [3:3] is the sub-list for method input_type
+ 3, // [3:3] is the sub-list for extension type_name
+ 3, // [3:3] is the sub-list for extension extendee
+ 0, // [0:3] is the sub-list for field type_name
+}
+
+func init() { file_jobs_proto_init() }
+func file_jobs_proto_init() {
+ if File_jobs_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Request); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*BatchRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Response); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Job); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Options); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_jobs_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 5,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_jobs_proto_goTypes,
+ DependencyIndexes: file_jobs_proto_depIdxs,
+ MessageInfos: file_jobs_proto_msgTypes,
+ }.Build()
+ File_jobs_proto = out.File
+ file_jobs_proto_rawDesc = nil
+ file_jobs_proto_goTypes = nil
+ file_jobs_proto_depIdxs = nil
+}
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 46434fa8..13fd5595 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -1,22 +1,36 @@
syntax = "proto3";
-package kv.v1beta;
+package jobs.v1beta;
option go_package = "./;jobsv1beta";
+// single job request
message Request {
- // could be an enum in the future
- string storage = 1;
- repeated Item items = 2;
+ Job job = 1;
}
-message Item {
- string key = 1;
- bytes value = 2;
- // RFC 3339
- string timeout = 3;
+// batch jobs request
+message BatchRequest {
+ repeated Job jobs = 1;
}
-// KV response for the KV RPC methods
+// RPC response
message Response {
- repeated Item items = 1;
+ string id = 1;
}
+
+message Job {
+ string job = 1;
+ string payload = 2;
+ Options options = 3;
+}
+
+message Options {
+ uint64 priority = 1;
+ string id = 2;
+ string pipeline = 3;
+ uint64 delay = 4;
+ uint64 attempts = 5;
+ uint64 retry_delay = 6;
+ uint64 timeout = 7;
+}
+
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 75578bff..19621735 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -1,17 +1,16 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
// source: kv.proto
package kvv1beta
import (
- reflect "reflect"
- sync "sync"
-
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index a2868118..188dcf08 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -1,17 +1,16 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
// source: websockets.proto
package websocketsv1beta
import (
- reflect "reflect"
- sync "sync"
-
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
)
const (
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index d86a8ad8..c81ba6ef 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -22,6 +22,7 @@ sqs:
jobs:
+ num_pollers: 32
# worker pool configuration
pool:
num_workers: 10
diff --git a/utils/pointers.go b/utils/pointers.go
new file mode 100644
index 00000000..9c192279
--- /dev/null
+++ b/utils/pointers.go
@@ -0,0 +1,15 @@
+package utils
+
+func AsUint64Ptr(val uint64) *uint64 {
+ if val == 0 {
+ val = 10
+ }
+ return &val
+}
+
+func AsStringPtr(val string) *string {
+ if val == "" {
+ return nil
+ }
+ return &val
+}