diff options
26 files changed, 935 insertions, 294 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index cf6c037b..4679cc24 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -73,7 +73,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priorityqueue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server @@ -14,7 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priorityqueue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config @@ -48,7 +48,7 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst - go test -v -race -tags=debug ./pkg/priority_queue + go test -v -race -tags=debug ./pkg/priorityqueue go test -v -race -tags=debug ./plugins/jobs/structs go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config @@ -81,6 +81,7 @@ testGo1.17beta1: ## Run application tests go1.17beta1 test -v -race -tags=debug ./pkg/worker go1.17beta1 test -v -race -tags=debug ./pkg/worker_watcher go1.17beta1 test -v -race -tags=debug ./pkg/bst + go1.17beta1 test -v -race -tags=debug ./pkg/priorityqueue go1.17beta1 test -v -race -tags=debug ./tests/plugins/http go1.17beta1 test -v -race -tags=debug ./plugins/http/config go1.17beta1 test -v -race -tags=debug ./plugins/server diff --git a/common/jobs/interface.go b/common/jobs/interface.go new file mode 100644 index 00000000..6738ed46 --- /dev/null +++ b/common/jobs/interface.go @@ -0,0 +1,26 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming +type Consumer interface { + Push(job *structs.Job) (*string, error) + PushBatch(job *[]structs.Job) (*string, error) + Consume(job *pipeline.Pipeline) + + Stop(pipeline string) + StopAll() + Resume(pipeline string) + ResumeAll() + + Register(pipe string) error + Stat() +} + +type Constructor interface { + JobsConstruct(configKey string, queue priorityqueue.Queue) (Consumer, error) +} diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go deleted file mode 100644 index c7c148da..00000000 --- a/pkg/priority_queue/binary_heap.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -binary heap (min-heap) algorithm used as a core for the priority queue -*/ - -package priorityqueue - -import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - -type BinHeap []priorityqueue.Item - -func NewBinHeap() *BinHeap { - return &BinHeap{} -} - -func (bh *BinHeap) fixUp() { - k := len(*bh) - 1 - p := (k - 1) >> 1 // k-1 / 2 - - for k > 0 { - cur, par := (*bh)[k], (*bh)[p] - - if cur.Priority() < par.Priority() { - bh.swap(k, p) - k = p - p = (k - 1) >> 1 - } else { - return - } - } -} - -func (bh *BinHeap) swap(i, j int) { - (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i] -} - -func (bh *BinHeap) fixDown(curr, end int) { - cOneIdx := curr*2 + 1 - for cOneIdx <= end { - cTwoIdx := -1 - if curr*2+2 <= end { - cTwoIdx = curr*2 + 2 - } - - idxToSwap := cOneIdx - if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() { - idxToSwap = cTwoIdx - } - if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() { - bh.swap(curr, idxToSwap) - curr = idxToSwap - cOneIdx = curr*2 + 1 - } else { - return - } - } -} - -func (bh *BinHeap) Insert(item priorityqueue.Item) { - *bh = append(*bh, item) - bh.fixUp() -} - -func (bh *BinHeap) GetMax() priorityqueue.Item { - l := len(*bh) - if l == 0 { - return nil - } - - bh.swap(0, l-1) - - item := (*bh)[l-1] - *bh = (*bh)[0 : l-1] - bh.fixDown(0, l-2) - return item -} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go deleted file mode 100644 index 528e8fd0..00000000 --- a/pkg/priority_queue/binary_heap_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package priorityqueue - -import ( - "testing" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/stretchr/testify/require" -) - -type Test int - -func (t Test) Ack() { -} - -func (t Test) Nack() { -} - -func (t Test) Body() []byte { - return nil -} - -func (t Test) Context() []byte { - return nil -} - -func (t Test) ID() string { - return "" -} - -func (t Test) Priority() uint64 { - return uint64(t) -} - -func TestBinHeap_Init(t *testing.T) { - a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap() - - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - - res := make([]priorityqueue.Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.GetMax() - res = append(res, item) - } - - require.Equal(t, expected, res) -} diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go deleted file mode 100644 index 2ff52a79..00000000 --- a/pkg/priority_queue/pq.go +++ /dev/null @@ -1,30 +0,0 @@ -package priorityqueue - -import ( - "sync" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" -) - -type PQ struct { - sync.RWMutex - bh *BinHeap -} - -func NewPriorityQueue() *PQ { - return &PQ{ - bh: NewBinHeap(), - } -} - -func (p *PQ) GetMax() priorityqueue.Item { - p.Lock() - defer p.Unlock() - return p.bh.GetMax() -} - -func (p *PQ) Insert(item priorityqueue.Item) { - p.Lock() - p.bh.Insert(item) - p.Unlock() -} diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go new file mode 100644 index 00000000..fe6a06fd --- /dev/null +++ b/pkg/priorityqueue/binary_heap.go @@ -0,0 +1,103 @@ +/* +binary heap (min-heap) algorithm used as a core for the priority queue +*/ + +package priorityqueue + +import ( + "sync" + "sync/atomic" +) + +type BinHeap struct { + items []Item + // find a way to use pointer to the raw data + len uint64 + cond sync.Cond +} + +func NewBinHeap() *BinHeap { + return &BinHeap{ + items: make([]Item, 0, 100), + len: 0, + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (bh *BinHeap) fixUp() { + k := len(bh.items) - 1 + p := (k - 1) >> 1 // k-1 / 2 + + for k > 0 { + cur, par := (bh.items)[k], (bh.items)[p] + + if *cur.Priority() < *par.Priority() { + bh.swap(k, p) + k = p + p = (k - 1) >> 1 + } else { + return + } + } +} + +func (bh *BinHeap) swap(i, j int) { + (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] +} + +func (bh *BinHeap) fixDown(curr, end int) { + cOneIdx := (curr << 1) + 1 + for cOneIdx <= end { + cTwoIdx := -1 + if (curr<<1)+2 <= end { + cTwoIdx = (curr << 1) + 2 + } + + idxToSwap := cOneIdx + // oh my, so unsafe + if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() { + idxToSwap = cTwoIdx + } + if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { + bh.swap(curr, idxToSwap) + curr = idxToSwap + cOneIdx = (curr << 1) + 1 + } else { + return + } + } +} + +func (bh *BinHeap) Insert(item Item) { + bh.cond.L.Lock() + bh.items = append(bh.items, item) + + // add len to the slice + atomic.AddUint64(&bh.len, 1) + + // fix binary heap up + bh.fixUp() + bh.cond.L.Unlock() + + // signal the goroutine on wait + bh.cond.Signal() +} + +func (bh *BinHeap) GetMax() Item { + bh.cond.L.Lock() + defer bh.cond.L.Unlock() + + for atomic.LoadUint64(&bh.len) == 0 { + bh.cond.Wait() + } + + bh.swap(0, int(bh.len-1)) + + item := (bh.items)[int(bh.len)-1] + bh.items = (bh).items[0 : int(bh.len)-1] + bh.fixDown(0, int(bh.len-2)) + + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) + return item +} diff --git a/pkg/priority_queue/pq_test.go b/pkg/priorityqueue/binary_heap_test.go index 49afe5e3..149ec764 100644 --- a/pkg/priority_queue/pq_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -6,13 +6,61 @@ import ( "sync/atomic" "testing" "time" + + "github.com/spiral/roadrunner/v2/utils" + "github.com/stretchr/testify/require" ) +type Test int + +func (t Test) Ack() { +} + +func (t Test) Nack() { +} + +func (t Test) Body() []byte { + return nil +} + +func (t Test) Context() []byte { + return nil +} + +func (t Test) ID() *string { + return utils.AsStringPtr("none") +} + +func (t Test) Priority() *uint64 { + return utils.AsUint64Ptr(uint64(t)) +} + +func TestBinHeap_Init(t *testing.T) { + a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap() + + for i := 0; i < len(a); i++ { + bh.Insert(a[i]) + } + + expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + res := make([]Item, 0, 12) + + for i := 0; i < 11; i++ { + item := bh.GetMax() + res = append(res, item) + } + + require.Equal(t, expected, res) +} + func TestNewPriorityQueue(t *testing.T) { insertsPerSec := uint64(0) getPerSec := uint64(0) stopCh := make(chan struct{}, 1) - pq := NewPriorityQueue() + pq := NewBinHeap() go func() { tt := time.NewTicker(time.Second) diff --git a/common/priority_queue/interface.go b/pkg/priorityqueue/interface.go index c1774223..7ac2e449 100644 --- a/common/priority_queue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -6,10 +6,10 @@ type Queue interface { } type Item interface { - ID() string - Priority() uint64 - Ack() - Nack() + ID() *string + Priority() *uint64 Body() []byte Context() []byte + Ack() + Nack() } diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 3eb20c27..4bbb4095 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,70 +1,104 @@ package ephemeral import ( + "sync" + "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { - queues map[string]bool + queues sync.Map pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - queues: make(map[string]bool), + queues: sync.Map{}, pq: q, } return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (string, error) { +func (j *JobBroker) Push(job *structs.Job) (*string, error) { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues[job.Options.Pipeline]; ok { - if !b { - return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } if job.Options.Priority == nil { - job.Options.Priority = intPtr(10) + job.Options.Priority = utils.AsUint64Ptr(10) } - job.Options.ID = uuid.NewString() + job.Options.ID = utils.AsStringPtr(uuid.NewString()) j.pq.Insert(job) return job.Options.ID, nil } - return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} - -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; ok { + if _, ok := j.queues.Load(pipeline); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = true + j.queues.Store(pipeline, true) return nil } -func intPtr(val uint64) *uint64 { - if val == 0 { - val = 10 +func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { + // Use a batch response + // Add JobID to the payload to match responses + panic("todo") +} + +func (j *JobBroker) Stop(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.queues.Store(pipeline, false) + } + } +} + +func (j *JobBroker) StopAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, false) + return true + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.queues.Store(pipeline, true) + } } - return &val +} + +func (j *JobBroker) ResumeAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, true) + return true + }) +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { + panic("implement me") } diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 146d1fdc..3d6a95b7 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,8 +1,8 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -23,6 +23,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { +func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(q) } diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1cb2c2a2..07e2ef38 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -1,14 +1,19 @@ package jobs import ( + "runtime" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" ) // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { - // Workers configures roadrunner server and worker busy. - // Workers *roadrunner.ServerConfig + // NumPollers configures number of priority queue pollers + // Should be no more than 255 + // Default - num logical cores + NumPollers uint8 `mapstructure:"num_pollers"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` // Pipelines defines mapping between PHP job pipeline and associated job broker. @@ -23,5 +28,9 @@ func (c *Config) InitDefaults() { c.Pool = &poolImpl.Config{} } + if c.NumPollers == 0 { + c.NumPollers = uint8(runtime.NumCPU()) + } + c.Pool.InitDefaults() } diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index 4161d7ca..56a8839d 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go deleted file mode 100644 index a2cf6ed9..00000000 --- a/plugins/jobs/interface.go +++ /dev/null @@ -1,24 +0,0 @@ -package jobs - -import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" -) - -// Consumer todo naming -type Consumer interface { - Push(*structs.Job) (string, error) - Stat() - Consume(*pipeline.Pipeline) - Register(pipe string) error -} - -type Broker interface { - InitJobBroker(queue priorityqueue.Queue) (Consumer, error) -} - -type Item interface { - ID() string - Payload() []byte -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 67077920..8a80479b 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,14 +2,15 @@ package jobs import ( "context" + "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -27,11 +28,13 @@ type Plugin struct { cfg *Config `mapstructure:"jobs"` log logger.Logger + sync.RWMutex + workersPool pool.Pool server server.Server - brokers map[string]Broker - consumers map[string]Consumer + jobConstructors map[string]jobs.Constructor + consumers map[string]jobs.Consumer events events.Handler @@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.brokers = make(map[string]Broker) - p.consumers = make(map[string]Consumer) + p.jobConstructors = make(map[string]jobs.Constructor) + p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) @@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue2.NewPriorityQueue() + p.queue = priorityqueue.NewBinHeap() p.log = log return nil @@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.brokers { - jb, err := p.brokers[name].InitJobBroker(p.queue) + for name := range p.jobConstructors { + jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) if err != nil { errCh <- err return errCh @@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error { // start listening go func() { - for { - // get data JOB from the queue - job := p.queue.GetMax() - - if job == nil { - continue - } - - exec := payload.Payload{ - Context: job.Context(), - Body: job.Body(), - } - - _, err = p.workersPool.Exec(exec) - if err != nil { - panic(err) - } + for i := uint8(0); i < p.cfg.NumPollers; i++ { + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } + + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } + + _, err := p.workersPool.Exec(exec) + if err != nil { + panic(err) + } + } + }() } }() @@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { - p.brokers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { + p.jobConstructors[name.Name()] = c } func (p *Plugin) Available() {} @@ -152,12 +159,13 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (string, error) { +func (p *Plugin) Push(j *structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Driver()] if !ok { - panic("broker not found") + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) } id, err := broker.Push(j) diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index e77cda59..c6bd1645 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,12 @@ package jobs import ( + "sync" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) type rpc struct { @@ -10,11 +14,96 @@ type rpc struct { p *Plugin } -func (r *rpc) Push(j *structs.Job, idRet *string) error { +var jobsPool = &sync.Pool{ + New: func() interface{} { + return &structs.Job{ + Options: &structs.Options{}, + } + }, +} + +func pubJob(j *structs.Job) { + // clear + j.Job = "" + j.Payload = "" + j.Options = &structs.Options{} + jobsPool.Put(j) +} + +func getJob() *structs.Job { + return jobsPool.Get().(*structs.Job) +} + +/* +List of the RPC methods: +1. Push - single job push +2. PushBatch - push job batch + +3. Reset - managed by the Resetter plugin + +4. Stop - stop pipeline processing +5. StopAll - stop all pipelines processing +6. Resume - resume pipeline processing +7. ResumeAll - resume stopped pipelines + +8. Workers - managed by the Informer plugin. +9. Stat - jobs statistic +*/ + +func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { + const op = errors.Op("jobs_rpc_push") + + // convert transport entity into domain + // how we can do this quickly + jb := getJob() + defer pubJob(jb) + + jb = &structs.Job{ + Job: j.GetJob().Job, + Payload: j.GetJob().Payload, + Options: &structs.Options{ + Priority: &j.GetJob().Options.Priority, + ID: &j.GetJob().Options.Id, + Pipeline: j.GetJob().Options.Pipeline, + Delay: j.GetJob().Options.Delay, + Attempts: j.GetJob().Options.Attempts, + RetryDelay: j.GetJob().Options.RetryDelay, + Timeout: j.GetJob().Options.Timeout, + }, + } + id, err := r.p.Push(jb) + if err != nil { + return errors.E(op, err) + } + + resp.Id = *id + + return nil +} + +func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { + const op = errors.Op("jobs_rpc_push") id, err := r.p.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - *idRet = id + + *idRet = *id + return nil +} + +func (r *rpc) Stop(pipeline string, w *string) error { + return nil +} + +func (r *rpc) StopAll(_ bool, w *string) error { + return nil +} + +func (r *rpc) Resume(pipeline string, w *string) error { + return nil +} + +func (r *rpc) ResumeAll(_ bool, w *string) error { return nil } diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go index 268444db..1ef4d2ca 100644 --- a/plugins/jobs/structs/job.go +++ b/plugins/jobs/structs/job.go @@ -17,12 +17,12 @@ type Job struct { Options *Options `json:"options,omitempty"` } -func (j *Job) ID() string { +func (j *Job) ID() *string { return j.Options.ID } -func (j *Job) Priority() uint64 { - return *j.Options.Priority +func (j *Job) Priority() *uint64 { + return j.Options.Priority } // Body packs job payload into binary payload. @@ -34,8 +34,8 @@ func (j *Job) Body() []byte { func (j *Job) Context() []byte { ctx, _ := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` + ID *string `json:"id"` + Job string `json:"job"` }{ID: j.Options.ID, Job: j.Job}, ) diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 029a797d..3e1ada85 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -9,23 +9,23 @@ type Options struct { Priority *uint64 `json:"priority"` // ID - generated ID for the job - ID string `json:"id"` + ID *string `json:"id"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay int `json:"delay,omitempty"` + Delay uint64 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts int `json:"maxAttempts,omitempty"` + Attempts uint64 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int `json:"retryDelay,omitempty"` + RetryDelay uint64 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int `json:"timeout,omitempty"` + Timeout uint64 `json:"timeout,omitempty"` } // Merge merges job options. @@ -52,7 +52,7 @@ func (o *Options) Merge(from *Options) { } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int) bool { +func (o *Options) CanRetry(attempt uint64) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go index 18702394..a16f7dd0 100644 --- a/plugins/jobs/structs/job_options_test.go +++ b/plugins/jobs/structs/job_options_test.go @@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) { }) assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, 1, opts.Attempts) - assert.Equal(t, 2, opts.Delay) - assert.Equal(t, 1, opts.Timeout) - assert.Equal(t, 1, opts.RetryDelay) + assert.Equal(t, uint64(1), opts.Attempts) + assert.Equal(t, uint64(2), opts.Delay) + assert.Equal(t, uint64(1), opts.Timeout) + assert.Equal(t, uint64(1), opts.RetryDelay) } func TestOptions_MergeKeepOriginal(t *testing.T) { @@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { }) assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, 10, opts.Attempts) - assert.Equal(t, 10, opts.Delay) - assert.Equal(t, 10, opts.Timeout) - assert.Equal(t, 10, opts.RetryDelay) + assert.Equal(t, uint64(10), opts.Attempts) + assert.Equal(t, uint64(10), opts.Delay) + assert.Equal(t, uint64(10), opts.Timeout) + assert.Equal(t, uint64(10), opts.RetryDelay) } diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go index 92f78081..0aa5b177 100644 --- a/plugins/jobs/structs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -3,6 +3,7 @@ package structs import ( "testing" + "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -13,7 +14,7 @@ func TestJob_Body(t *testing.T) { } func TestJob_Context(t *testing.T) { - j := &Job{Job: "job", Options: &Options{ID: "id"}} + j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}} assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) } diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go new file mode 100644 index 00000000..9d8427be --- /dev/null +++ b/proto/jobs/v1beta/jobs.pb.go @@ -0,0 +1,476 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: jobs.proto + +package jobsv1beta + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// single job request +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetJob() *Job { + if x != nil { + return x.Job + } + return nil +} + +// batch jobs request +type BatchRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"` +} + +func (x *BatchRequest) Reset() { + *x = BatchRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchRequest) ProtoMessage() {} + +func (x *BatchRequest) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchRequest.ProtoReflect.Descriptor instead. +func (*BatchRequest) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchRequest) GetJobs() []*Job { + if x != nil { + return x.Jobs + } + return nil +} + +// RPC response +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Options *Options `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{3} +} + +func (x *Job) GetJob() string { + if x != nil { + return x.Job + } + return "" +} + +func (x *Job) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + +func (x *Job) GetOptions() *Options { + if x != nil { + return x.Options + } + return nil +} + +type Options struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Pipeline string `protobuf:"bytes,3,opt,name=pipeline,proto3" json:"pipeline,omitempty"` + Delay uint64 `protobuf:"varint,4,opt,name=delay,proto3" json:"delay,omitempty"` + Attempts uint64 `protobuf:"varint,5,opt,name=attempts,proto3" json:"attempts,omitempty"` + RetryDelay uint64 `protobuf:"varint,6,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"` + Timeout uint64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Options) Reset() { + *x = Options{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Options) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Options) ProtoMessage() {} + +func (x *Options) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Options.ProtoReflect.Descriptor instead. +func (*Options) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{4} +} + +func (x *Options) GetPriority() uint64 { + if x != nil { + return x.Priority + } + return 0 +} + +func (x *Options) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Options) GetPipeline() string { + if x != nil { + return x.Pipeline + } + return "" +} + +func (x *Options) GetDelay() uint64 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *Options) GetAttempts() uint64 { + if x != nil { + return x.Attempts + } + return 0 +} + +func (x *Options) GetRetryDelay() uint64 { + if x != nil { + return x.RetryDelay + } + return 0 +} + +func (x *Options) GetTimeout() uint64 { + if x != nil { + return x.Timeout + } + return 0 +} + +var File_jobs_proto protoreflect.FileDescriptor + +var file_jobs_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f, + 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x34, 0x0a, 0x0c, 0x42, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, + 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x1a, + 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x61, 0x0a, 0x03, 0x4a, 0x6f, + 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6a, 0x6f, 0x62, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a, + 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, + 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xbe, 0x01, + 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, + 0x61, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, + 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, + 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_jobs_proto_rawDescOnce sync.Once + file_jobs_proto_rawDescData = file_jobs_proto_rawDesc +) + +func file_jobs_proto_rawDescGZIP() []byte { + file_jobs_proto_rawDescOnce.Do(func() { + file_jobs_proto_rawDescData = protoimpl.X.CompressGZIP(file_jobs_proto_rawDescData) + }) + return file_jobs_proto_rawDescData +} + +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_jobs_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: jobs.v1beta.Request + (*BatchRequest)(nil), // 1: jobs.v1beta.BatchRequest + (*Response)(nil), // 2: jobs.v1beta.Response + (*Job)(nil), // 3: jobs.v1beta.Job + (*Options)(nil), // 4: jobs.v1beta.Options +} +var file_jobs_proto_depIdxs = []int32{ + 3, // 0: jobs.v1beta.Request.job:type_name -> jobs.v1beta.Job + 3, // 1: jobs.v1beta.BatchRequest.jobs:type_name -> jobs.v1beta.Job + 4, // 2: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_jobs_proto_init() } +func file_jobs_proto_init() { + if File_jobs_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Options); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_jobs_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_jobs_proto_goTypes, + DependencyIndexes: file_jobs_proto_depIdxs, + MessageInfos: file_jobs_proto_msgTypes, + }.Build() + File_jobs_proto = out.File + file_jobs_proto_rawDesc = nil + file_jobs_proto_goTypes = nil + file_jobs_proto_depIdxs = nil +} diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 46434fa8..13fd5595 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -1,22 +1,36 @@ syntax = "proto3"; -package kv.v1beta; +package jobs.v1beta; option go_package = "./;jobsv1beta"; +// single job request message Request { - // could be an enum in the future - string storage = 1; - repeated Item items = 2; + Job job = 1; } -message Item { - string key = 1; - bytes value = 2; - // RFC 3339 - string timeout = 3; +// batch jobs request +message BatchRequest { + repeated Job jobs = 1; } -// KV response for the KV RPC methods +// RPC response message Response { - repeated Item items = 1; + string id = 1; } + +message Job { + string job = 1; + string payload = 2; + Options options = 3; +} + +message Options { + uint64 priority = 1; + string id = 2; + string pipeline = 3; + uint64 delay = 4; + uint64 attempts = 5; + uint64 retry_delay = 6; + uint64 timeout = 7; +} + diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 75578bff..19621735 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: kv.proto package kvv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index a2868118..188dcf08 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: websockets.proto package websocketsv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index d86a8ad8..c81ba6ef 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -22,6 +22,7 @@ sqs: jobs: + num_pollers: 32 # worker pool configuration pool: num_workers: 10 diff --git a/utils/pointers.go b/utils/pointers.go new file mode 100644 index 00000000..9c192279 --- /dev/null +++ b/utils/pointers.go @@ -0,0 +1,15 @@ +package utils + +func AsUint64Ptr(val uint64) *uint64 { + if val == 0 { + val = 10 + } + return &val +} + +func AsStringPtr(val string) *string { + if val == "" { + return nil + } + return &val +} |