diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
26 files changed, 935 insertions, 294 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index cf6c037b..4679cc24 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -73,7 +73,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priorityqueue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server @@ -14,7 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priorityqueue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config @@ -48,7 +48,7 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst - go test -v -race -tags=debug ./pkg/priority_queue + go test -v -race -tags=debug ./pkg/priorityqueue go test -v -race -tags=debug ./plugins/jobs/structs go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config @@ -81,6 +81,7 @@ testGo1.17beta1: ## Run application tests go1.17beta1 test -v -race -tags=debug ./pkg/worker go1.17beta1 test -v -race -tags=debug ./pkg/worker_watcher go1.17beta1 test -v -race -tags=debug ./pkg/bst + go1.17beta1 test -v -race -tags=debug ./pkg/priorityqueue go1.17beta1 test -v -race -tags=debug ./tests/plugins/http go1.17beta1 test -v -race -tags=debug ./plugins/http/config go1.17beta1 test -v -race -tags=debug ./plugins/server diff --git a/common/jobs/interface.go b/common/jobs/interface.go new file mode 100644 index 00000000..6738ed46 --- /dev/null +++ b/common/jobs/interface.go @@ -0,0 +1,26 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming +type Consumer interface { + Push(job *structs.Job) (*string, error) + PushBatch(job *[]structs.Job) (*string, error) + Consume(job *pipeline.Pipeline) + + Stop(pipeline string) + StopAll() + Resume(pipeline string) + ResumeAll() + + Register(pipe string) error + Stat() +} + +type Constructor interface { + JobsConstruct(configKey string, queue priorityqueue.Queue) (Consumer, error) +} diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go deleted file mode 100644 index c7c148da..00000000 --- a/pkg/priority_queue/binary_heap.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -binary heap (min-heap) algorithm used as a core for the priority queue -*/ - -package priorityqueue - -import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - -type BinHeap []priorityqueue.Item - -func NewBinHeap() *BinHeap { - return &BinHeap{} -} - -func (bh *BinHeap) fixUp() { - k := len(*bh) - 1 - p := (k - 1) >> 1 // k-1 / 2 - - for k > 0 { - cur, par := (*bh)[k], (*bh)[p] - - if cur.Priority() < par.Priority() { - bh.swap(k, p) - k = p - p = (k - 1) >> 1 - } else { - return - } - } -} - -func (bh *BinHeap) swap(i, j int) { - (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i] -} - -func (bh *BinHeap) fixDown(curr, end int) { - cOneIdx := curr*2 + 1 - for cOneIdx <= end { - cTwoIdx := -1 - if curr*2+2 <= end { - cTwoIdx = curr*2 + 2 - } - - idxToSwap := cOneIdx - if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() { - idxToSwap = cTwoIdx - } - if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() { - bh.swap(curr, idxToSwap) - curr = idxToSwap - cOneIdx = curr*2 + 1 - } else { - return - } - } -} - -func (bh *BinHeap) Insert(item priorityqueue.Item) { - *bh = append(*bh, item) - bh.fixUp() -} - -func (bh *BinHeap) GetMax() priorityqueue.Item { - l := len(*bh) - if l == 0 { - return nil - } - - bh.swap(0, l-1) - - item := (*bh)[l-1] - *bh = (*bh)[0 : l-1] - bh.fixDown(0, l-2) - return item -} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go deleted file mode 100644 index 528e8fd0..00000000 --- a/pkg/priority_queue/binary_heap_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package priorityqueue - -import ( - "testing" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/stretchr/testify/require" -) - -type Test int - -func (t Test) Ack() { -} - -func (t Test) Nack() { -} - -func (t Test) Body() []byte { - return nil -} - -func (t Test) Context() []byte { - return nil -} - -func (t Test) ID() string { - return "" -} - -func (t Test) Priority() uint64 { - return uint64(t) -} - -func TestBinHeap_Init(t *testing.T) { - a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap() - - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - - res := make([]priorityqueue.Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.GetMax() - res = append(res, item) - } - - require.Equal(t, expected, res) -} diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go deleted file mode 100644 index 2ff52a79..00000000 --- a/pkg/priority_queue/pq.go +++ /dev/null @@ -1,30 +0,0 @@ -package priorityqueue - -import ( - "sync" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" -) - -type PQ struct { - sync.RWMutex - bh *BinHeap -} - -func NewPriorityQueue() *PQ { - return &PQ{ - bh: NewBinHeap(), - } -} - -func (p *PQ) GetMax() priorityqueue.Item { - p.Lock() - defer p.Unlock() - return p.bh.GetMax() -} - -func (p *PQ) Insert(item priorityqueue.Item) { - p.Lock() - p.bh.Insert(item) - p.Unlock() -} diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go new file mode 100644 index 00000000..fe6a06fd --- /dev/null +++ b/pkg/priorityqueue/binary_heap.go @@ -0,0 +1,103 @@ +/* +binary heap (min-heap) algorithm used as a core for the priority queue +*/ + +package priorityqueue + +import ( + "sync" + "sync/atomic" +) + +type BinHeap struct { + items []Item + // find a way to use pointer to the raw data + len uint64 + cond sync.Cond +} + +func NewBinHeap() *BinHeap { + return &BinHeap{ + items: make([]Item, 0, 100), + len: 0, + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (bh *BinHeap) fixUp() { + k := len(bh.items) - 1 + p := (k - 1) >> 1 // k-1 / 2 + + for k > 0 { + cur, par := (bh.items)[k], (bh.items)[p] + + if *cur.Priority() < *par.Priority() { + bh.swap(k, p) + k = p + p = (k - 1) >> 1 + } else { + return + } + } +} + +func (bh *BinHeap) swap(i, j int) { + (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] +} + +func (bh *BinHeap) fixDown(curr, end int) { + cOneIdx := (curr << 1) + 1 + for cOneIdx <= end { + cTwoIdx := -1 + if (curr<<1)+2 <= end { + cTwoIdx = (curr << 1) + 2 + } + + idxToSwap := cOneIdx + // oh my, so unsafe + if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() { + idxToSwap = cTwoIdx + } + if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { + bh.swap(curr, idxToSwap) + curr = idxToSwap + cOneIdx = (curr << 1) + 1 + } else { + return + } + } +} + +func (bh *BinHeap) Insert(item Item) { + bh.cond.L.Lock() + bh.items = append(bh.items, item) + + // add len to the slice + atomic.AddUint64(&bh.len, 1) + + // fix binary heap up + bh.fixUp() + bh.cond.L.Unlock() + + // signal the goroutine on wait + bh.cond.Signal() +} + +func (bh *BinHeap) GetMax() Item { + bh.cond.L.Lock() + defer bh.cond.L.Unlock() + + for atomic.LoadUint64(&bh.len) == 0 { + bh.cond.Wait() + } + + bh.swap(0, int(bh.len-1)) + + item := (bh.items)[int(bh.len)-1] + bh.items = (bh).items[0 : int(bh.len)-1] + bh.fixDown(0, int(bh.len-2)) + + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) + return item +} diff --git a/pkg/priority_queue/pq_test.go b/pkg/priorityqueue/binary_heap_test.go index 49afe5e3..149ec764 100644 --- a/pkg/priority_queue/pq_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -6,13 +6,61 @@ import ( "sync/atomic" "testing" "time" + + "github.com/spiral/roadrunner/v2/utils" + "github.com/stretchr/testify/require" ) +type Test int + +func (t Test) Ack() { +} + +func (t Test) Nack() { +} + +func (t Test) Body() []byte { + return nil +} + +func (t Test) Context() []byte { + return nil +} + +func (t Test) ID() *string { + return utils.AsStringPtr("none") +} + +func (t Test) Priority() *uint64 { + return utils.AsUint64Ptr(uint64(t)) +} + +func TestBinHeap_Init(t *testing.T) { + a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap() + + for i := 0; i < len(a); i++ { + bh.Insert(a[i]) + } + + expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + res := make([]Item, 0, 12) + + for i := 0; i < 11; i++ { + item := bh.GetMax() + res = append(res, item) + } + + require.Equal(t, expected, res) +} + func TestNewPriorityQueue(t *testing.T) { insertsPerSec := uint64(0) getPerSec := uint64(0) stopCh := make(chan struct{}, 1) - pq := NewPriorityQueue() + pq := NewBinHeap() go func() { tt := time.NewTicker(time.Second) diff --git a/common/priority_queue/interface.go b/pkg/priorityqueue/interface.go index c1774223..7ac2e449 100644 --- a/common/priority_queue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -6,10 +6,10 @@ type Queue interface { } type Item interface { - ID() string - Priority() uint64 - Ack() - Nack() + ID() *string + Priority() *uint64 Body() []byte Context() []byte + Ack() + Nack() } diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 3eb20c27..4bbb4095 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,70 +1,104 @@ package ephemeral import ( + "sync" + "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { - queues map[string]bool + queues sync.Map pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - queues: make(map[string]bool), + queues: sync.Map{}, pq: q, } return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (string, error) { +func (j *JobBroker) Push(job *structs.Job) (*string, error) { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues[job.Options.Pipeline]; ok { - if !b { - return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } if job.Options.Priority == nil { - job.Options.Priority = intPtr(10) + job.Options.Priority = utils.AsUint64Ptr(10) } - job.Options.ID = uuid.NewString() + job.Options.ID = utils.AsStringPtr(uuid.NewString()) j.pq.Insert(job) return job.Options.ID, nil } - return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} - -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; ok { + if _, ok := j.queues.Load(pipeline); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = true + j.queues.Store(pipeline, true) return nil } -func intPtr(val uint64) *uint64 { - if val == 0 { - val = 10 +func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { + // Use a batch response + // Add JobID to the payload to match responses + panic("todo") +} + +func (j *JobBroker) Stop(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.queues.Store(pipeline, false) + } + } +} + +func (j *JobBroker) StopAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, false) + return true + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.queues.Store(pipeline, true) + } } - return &val +} + +func (j *JobBroker) ResumeAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, true) + return true + }) +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { + panic("implement me") } diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 146d1fdc..3d6a95b7 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,8 +1,8 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -23,6 +23,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { +func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(q) } diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1cb2c2a2..07e2ef38 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -1,14 +1,19 @@ package jobs import ( + "runtime" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" ) // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { - // Workers configures roadrunner server and worker busy. - // Workers *roadrunner.ServerConfig + // NumPollers configures number of priority queue pollers + // Should be no more than 255 + // Default - num logical cores + NumPollers uint8 `mapstructure:"num_pollers"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` // Pipelines defines mapping between PHP job pipeline and associated job broker. @@ -23,5 +28,9 @@ func (c *Config) InitDefaults() { c.Pool = &poolImpl.Config{} } + if c.NumPollers == 0 { + c.NumPollers = uint8(runtime.NumCPU()) + } + c.Pool.InitDefaults() } diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index 4161d7ca..56a8839d 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go deleted file mode 100644 index a2cf6ed9..00000000 --- a/plugins/jobs/interface.go +++ /dev/null @@ -1,24 +0,0 @@ -package jobs - -import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" -) - -// Consumer todo naming -type Consumer interface { - Push(*structs.Job) (string, error) - Stat() - Consume(*pipeline.Pipeline) - Register(pipe string) error -} - -type Broker interface { - InitJobBroker(queue priorityqueue.Queue) (Consumer, error) -} - -type Item interface { - ID() string - Payload() []byte -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 67077920..8a80479b 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,14 +2,15 @@ package jobs import ( "context" + "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -27,11 +28,13 @@ type Plugin struct { cfg *Config `mapstructure:"jobs"` log logger.Logger + sync.RWMutex + workersPool pool.Pool server server.Server - brokers map[string]Broker - consumers map[string]Consumer + jobConstructors map[string]jobs.Constructor + consumers map[string]jobs.Consumer events events.Handler @@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.brokers = make(map[string]Broker) - p.consumers = make(map[string]Consumer) + p.jobConstructors = make(map[string]jobs.Constructor) + p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) @@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue2.NewPriorityQueue() + p.queue = priorityqueue.NewBinHeap() p.log = log return nil @@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.brokers { - jb, err := p.brokers[name].InitJobBroker(p.queue) + for name := range p.jobConstructors { + jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) if err != nil { errCh <- err return errCh @@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error { // start listening go func() { - for { - // get data JOB from the queue - job := p.queue.GetMax() - - if job == nil { - continue - } - - exec := payload.Payload{ - Context: job.Context(), - Body: job.Body(), - } - - _, err = p.workersPool.Exec(exec) - if err != nil { - panic(err) - } + for i := uint8(0); i < p.cfg.NumPollers; i++ { + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } + + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } + + _, err := p.workersPool.Exec(exec) + if err != nil { + panic(err) + } + } + }() } }() @@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { - p.brokers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { + p.jobConstructors[name.Name()] = c } func (p *Plugin) Available() {} @@ -152,12 +159,13 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (string, error) { +func (p *Plugin) Push(j *structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Driver()] if !ok { - panic("broker not found") + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) } id, err := broker.Push(j) diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index e77cda59..c6bd1645 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,12 @@ package jobs import ( + "sync" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) type rpc struct { @@ -10,11 +14,96 @@ type rpc struct { p *Plugin } -func (r *rpc) Push(j *structs.Job, idRet *string) error { +var jobsPool = &sync.Pool{ + New: func() interface{} { + return &structs.Job{ + Options: &structs.Options{}, + } + }, +} + +func pubJob(j *structs.Job) { + // clear + j.Job = "" + j.Payload = "" + j.Options = &structs.Options{} + jobsPool.Put(j) +} + +func getJob() *structs.Job { + return jobsPool.Get().(*structs.Job) +} + +/* +List of the RPC methods: +1. Push - single job push +2. PushBatch - push job batch + +3. Reset - managed by the Resetter plugin + +4. Stop - stop pipeline processing +5. StopAll - stop all pipelines processing +6. Resume - resume pipeline processing +7. ResumeAll - resume stopped pipelines + +8. Workers - managed by the Informer plugin. +9. Stat - jobs statistic +*/ + +func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { + const op = errors.Op("jobs_rpc_push") + + // convert transport entity into domain + // how we can do this quickly + jb := getJob() + defer pubJob(jb) + + jb = &structs.Job{ + Job: j.GetJob().Job, + Payload: j.GetJob().Payload, + Options: &structs.Options{ + Priority: &j.GetJob().Options.Priority, + ID: &j.GetJob().Options.Id, + Pipeline: j.GetJob().Options.Pipeline, + Delay: j.GetJob().Options.Delay, + Attempts: j.GetJob().Options.Attempts, + RetryDelay: j.GetJob().Options.RetryDelay, + Timeout: j.GetJob().Options.Timeout, + }, + } + id, err := r.p.Push(jb) + if err != nil { + return errors.E(op, err) + } + + resp.Id = *id + + return nil +} + +func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { + const op = errors.Op("jobs_rpc_push") id, err := r.p.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - *idRet = id + + *idRet = *id + return nil +} + +func (r *rpc) Stop(pipeline string, w *string) error { + return nil +} + +func (r *rpc) StopAll(_ bool, w *string) error { + return nil +} + +func (r *rpc) Resume(pipeline string, w *string) error { + return nil +} + +func (r *rpc) ResumeAll(_ bool, w *string) error { return nil } diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go index 268444db..1ef4d2ca 100644 --- a/plugins/jobs/structs/job.go +++ b/plugins/jobs/structs/job.go @@ -17,12 +17,12 @@ type Job struct { Options *Options `json:"options,omitempty"` } -func (j *Job) ID() string { +func (j *Job) ID() *string { return j.Options.ID } -func (j *Job) Priority() uint64 { - return *j.Options.Priority +func (j *Job) Priority() *uint64 { + return j.Options.Priority } // Body packs job payload into binary payload. @@ -34,8 +34,8 @@ func (j *Job) Body() []byte { func (j *Job) Context() []byte { ctx, _ := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` + ID *string `json:"id"` + Job string `json:"job"` }{ID: j.Options.ID, Job: j.Job}, ) diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 029a797d..3e1ada85 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -9,23 +9,23 @@ type Options struct { Priority *uint64 `json:"priority"` // ID - generated ID for the job - ID string `json:"id"` + ID *string `json:"id"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay int `json:"delay,omitempty"` + Delay uint64 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts int `json:"maxAttempts,omitempty"` + Attempts uint64 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int `json:"retryDelay,omitempty"` + RetryDelay uint64 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int `json:"timeout,omitempty"` + Timeout uint64 `json:"timeout,omitempty"` } // Merge merges job options. @@ -52,7 +52,7 @@ func (o *Options) Merge(from *Options) { } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int) bool { +func (o *Options) CanRetry(attempt uint64) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go index 18702394..a16f7dd0 100644 --- a/plugins/jobs/structs/job_options_test.go +++ b/plugins/jobs/structs/job_options_test.go @@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) { }) assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, 1, opts.Attempts) - assert.Equal(t, 2, opts.Delay) - assert.Equal(t, 1, opts.Timeout) - assert.Equal(t, 1, opts.RetryDelay) + assert.Equal(t, uint64(1), opts.Attempts) + assert.Equal(t, uint64(2), opts.Delay) + assert.Equal(t, uint64(1), opts.Timeout) + assert.Equal(t, uint64(1), opts.RetryDelay) } func TestOptions_MergeKeepOriginal(t *testing.T) { @@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { }) assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, 10, opts.Attempts) - assert.Equal(t, 10, opts.Delay) - assert.Equal(t, 10, opts.Timeout) - assert.Equal(t, 10, opts.RetryDelay) + assert.Equal(t, uint64(10), opts.Attempts) + assert.Equal(t, uint64(10), opts.Delay) + assert.Equal(t, uint64(10), opts.Timeout) + assert.Equal(t, uint64(10), opts.RetryDelay) } diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go index 92f78081..0aa5b177 100644 --- a/plugins/jobs/structs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -3,6 +3,7 @@ package structs import ( "testing" + "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -13,7 +14,7 @@ func TestJob_Body(t *testing.T) { } func TestJob_Context(t *testing.T) { - j := &Job{Job: "job", Options: &Options{ID: "id"}} + j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}} assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) } diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go new file mode 100644 index 00000000..9d8427be --- /dev/null +++ b/proto/jobs/v1beta/jobs.pb.go @@ -0,0 +1,476 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: jobs.proto + +package jobsv1beta + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// single job request +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetJob() *Job { + if x != nil { + return x.Job + } + return nil +} + +// batch jobs request +type BatchRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"` +} + +func (x *BatchRequest) Reset() { + *x = BatchRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchRequest) ProtoMessage() {} + +func (x *BatchRequest) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchRequest.ProtoReflect.Descriptor instead. +func (*BatchRequest) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchRequest) GetJobs() []*Job { + if x != nil { + return x.Jobs + } + return nil +} + +// RPC response +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Options *Options `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{3} +} + +func (x *Job) GetJob() string { + if x != nil { + return x.Job + } + return "" +} + +func (x *Job) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + +func (x *Job) GetOptions() *Options { + if x != nil { + return x.Options + } + return nil +} + +type Options struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Pipeline string `protobuf:"bytes,3,opt,name=pipeline,proto3" json:"pipeline,omitempty"` + Delay uint64 `protobuf:"varint,4,opt,name=delay,proto3" json:"delay,omitempty"` + Attempts uint64 `protobuf:"varint,5,opt,name=attempts,proto3" json:"attempts,omitempty"` + RetryDelay uint64 `protobuf:"varint,6,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"` + Timeout uint64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Options) Reset() { + *x = Options{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Options) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Options) ProtoMessage() {} + +func (x *Options) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Options.ProtoReflect.Descriptor instead. +func (*Options) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{4} +} + +func (x *Options) GetPriority() uint64 { + if x != nil { + return x.Priority + } + return 0 +} + +func (x *Options) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Options) GetPipeline() string { + if x != nil { + return x.Pipeline + } + return "" +} + +func (x *Options) GetDelay() uint64 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *Options) GetAttempts() uint64 { + if x != nil { + return x.Attempts + } + return 0 +} + +func (x *Options) GetRetryDelay() uint64 { + if x != nil { + return x.RetryDelay + } + return 0 +} + +func (x *Options) GetTimeout() uint64 { + if x != nil { + return x.Timeout + } + return 0 +} + +var File_jobs_proto protoreflect.FileDescriptor + +var file_jobs_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f, + 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x34, 0x0a, 0x0c, 0x42, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, + 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x1a, + 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x61, 0x0a, 0x03, 0x4a, 0x6f, + 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6a, 0x6f, 0x62, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a, + 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, + 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xbe, 0x01, + 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, + 0x61, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, + 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, + 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_jobs_proto_rawDescOnce sync.Once + file_jobs_proto_rawDescData = file_jobs_proto_rawDesc +) + +func file_jobs_proto_rawDescGZIP() []byte { + file_jobs_proto_rawDescOnce.Do(func() { + file_jobs_proto_rawDescData = protoimpl.X.CompressGZIP(file_jobs_proto_rawDescData) + }) + return file_jobs_proto_rawDescData +} + +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_jobs_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: jobs.v1beta.Request + (*BatchRequest)(nil), // 1: jobs.v1beta.BatchRequest + (*Response)(nil), // 2: jobs.v1beta.Response + (*Job)(nil), // 3: jobs.v1beta.Job + (*Options)(nil), // 4: jobs.v1beta.Options +} +var file_jobs_proto_depIdxs = []int32{ + 3, // 0: jobs.v1beta.Request.job:type_name -> jobs.v1beta.Job + 3, // 1: jobs.v1beta.BatchRequest.jobs:type_name -> jobs.v1beta.Job + 4, // 2: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_jobs_proto_init() } +func file_jobs_proto_init() { + if File_jobs_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Options); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_jobs_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_jobs_proto_goTypes, + DependencyIndexes: file_jobs_proto_depIdxs, + MessageInfos: file_jobs_proto_msgTypes, + }.Build() + File_jobs_proto = out.File + file_jobs_proto_rawDesc = nil + file_jobs_proto_goTypes = nil + file_jobs_proto_depIdxs = nil +} diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 46434fa8..13fd5595 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -1,22 +1,36 @@ syntax = "proto3"; -package kv.v1beta; +package jobs.v1beta; option go_package = "./;jobsv1beta"; +// single job request message Request { - // could be an enum in the future - string storage = 1; - repeated Item items = 2; + Job job = 1; } -message Item { - string key = 1; - bytes value = 2; - // RFC 3339 - string timeout = 3; +// batch jobs request +message BatchRequest { + repeated Job jobs = 1; } -// KV response for the KV RPC methods +// RPC response message Response { - repeated Item items = 1; + string id = 1; } + +message Job { + string job = 1; + string payload = 2; + Options options = 3; +} + +message Options { + uint64 priority = 1; + string id = 2; + string pipeline = 3; + uint64 delay = 4; + uint64 attempts = 5; + uint64 retry_delay = 6; + uint64 timeout = 7; +} + diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 75578bff..19621735 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: kv.proto package kvv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index a2868118..188dcf08 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: websockets.proto package websocketsv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index d86a8ad8..c81ba6ef 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -22,6 +22,7 @@ sqs: jobs: + num_pollers: 32 # worker pool configuration pool: num_workers: 10 diff --git a/utils/pointers.go b/utils/pointers.go new file mode 100644 index 00000000..9c192279 --- /dev/null +++ b/utils/pointers.go @@ -0,0 +1,15 @@ +package utils + +func AsUint64Ptr(val uint64) *uint64 { + if val == 0 { + val = 10 + } + return &val +} + +func AsStringPtr(val string) *string { + if val == "" { + return nil + } + return &val +} |