summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 01:18:56 +0300
committerValery Piashchynski <[email protected]>2021-07-10 01:18:56 +0300
commit4fcb5979fad87f6e268f5b9df91ee2ee91e9ef16 (patch)
tree30ed85120f8a39fd07756af9f5ce3422cf318971
parent4566f88004e81d3229222d82614c15346ac2e47d (diff)
AMQP job driver...
Update main driver's interface, add Consume(*pipeline) method. Implement it on the amqp and ephemeral drivers. Fix error with incorrect order of Register <-> Consume method calls. Implement rabbitMQ driver, add timeouts, dead-letter-exchange, packing-unpacking of the amqp messages. Implement AMQP redialer in case of network error as well as channels re-creation. Update drawio diagram. Update .rr.yaml jobs configuration, add all amqp options. Implement Ack/Nack. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--common/jobs/interface.go1
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go282
-rw-r--r--plugins/jobs/brokers/amqp/headers.go68
-rw-r--r--plugins/jobs/brokers/amqp/item.go2
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go4
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go35
-rw-r--r--plugins/jobs/brokers/amqp/redial.go47
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go5
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/plugin.go15
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml16
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go4
12 files changed, 410 insertions, 71 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index deb90cde..3c29447d 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -10,6 +10,7 @@ import (
type Consumer interface {
Push(job *structs.Job) error
Register(pipeline *pipeline.Pipeline) error
+ Consume(pipeline *pipeline.Pipeline) error
List() []*pipeline.Pipeline
Pause(pipeline string)
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index f91b71e7..9ac47269 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -1,9 +1,11 @@
package amqp
import (
+ "fmt"
"sync"
"time"
+ "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
@@ -14,9 +16,32 @@ import (
"github.com/streadway/amqp"
)
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
type Config struct {
- Addr string
- Queue string
+ PrefetchCount int `mapstructure:"pipeline_size"`
+ Queue string `mapstructure:"queue"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
}
type JobsConsumer struct {
@@ -27,35 +52,91 @@ type JobsConsumer struct {
pipelines sync.Map
// amqp connection
- conn *amqp.Connection
+ conn *amqp.Connection
+ consumeChan *amqp.Channel
+ publishChan *amqp.Channel
+
retryTimeout time.Duration
prefetchCount int
exchangeName string
+ queue string
+ consumeID string
connStr string
exchangeType string
routingKey string
+ // TODO send data to channel
stop chan struct{}
}
func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp name
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
jb := &JobsConsumer{
- logger: log,
- pq: pq,
+ logger: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stop: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ }
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- d, err := jb.initRabbitMQ()
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(configKey, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeCfg.RoutingKey
+ jb.queue = pipeCfg.Queue
+ jb.exchangeType = pipeCfg.ExchangeType
+ jb.exchangeName = pipeCfg.Exchange
+ jb.prefetchCount = pipeCfg.PrefetchCount
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // assign address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
if err != nil {
return nil, err
}
- // run listener
- jb.listener(d)
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ panic(err)
+ }
- // run redialer
+ // run redialer for the connection
jb.redialer()
return jb, nil
@@ -63,27 +144,57 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
func (j *JobsConsumer) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
+ j.RLock()
+ defer j.RUnlock()
// check if the pipeline registered
- if b, ok := j.pipelines.Load(job.Options.Pipeline); ok {
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
- }
-
+ if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
// handle timeouts
- if job.Options.Timeout > 0 {
- go func(jj *structs.Job) {
- time.Sleep(jj.Options.TimeoutDuration())
+ if job.Options.DelayDuration() > 0 {
+ // pub
+ delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- // TODO push
+ _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: 100,
+ dlxExpires: 200,
+ })
- // send the item after timeout expired
- }(job)
+ if err != nil {
+ panic(err)
+ }
+
+ err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ panic(err)
+ }
+
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, 0, job),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: nil,
+ })
+ if err != nil {
+ panic(err)
+ }
return nil
}
// insert to the local, limited pipeline
+ err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ //Headers: pack(job.Ident, 0, job),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: nil,
+ })
+ if err != nil {
+ panic(err)
+ }
return nil
}
@@ -92,7 +203,52 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
}
func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
- panic("implement me")
+ const op = errors.Op("rabbitmq_register")
+ if _, ok := j.pipelines.Load(pipeline.Name()); ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
+ }
+
+ j.pipelines.Store(pipeline.Name(), true)
+
+ return nil
+}
+
+func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ if _, ok := j.pipelines.Load(pipeline.Name()); !ok {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name()))
+ }
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // run listener
+ j.listener(deliv)
+
+ return nil
}
func (j *JobsConsumer) List() []*pipeline.Pipeline {
@@ -100,9 +256,87 @@ func (j *JobsConsumer) List() []*pipeline.Pipeline {
}
func (j *JobsConsumer) Pause(pipeline string) {
- panic("implement me")
+ if q, ok := j.pipelines.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.pipelines.Store(pipeline, false)
+ }
+ }
+
+ err := j.publishChan.Cancel(j.consumeID, true)
+ if err != nil {
+ j.logger.Error("cancel publish channel, forcing close", "error", err)
+ errCl := j.publishChan.Close()
+ if errCl != nil {
+ j.logger.Error("force close failed", "error", err)
+ }
+ }
}
func (j *JobsConsumer) Resume(pipeline string) {
- panic("implement me")
+ if q, ok := j.pipelines.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.pipelines.Store(pipeline, true)
+ }
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ j.logger.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ j.logger.Error("qos set failed", "error", err)
+ return
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ j.logger.Error("consume operation failed", "error", err)
+ return
+ }
+
+ // run listener
+ j.listener(deliv)
+ }
+}
+
+// Declare used to dynamically declare a pipeline
+func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error {
+ pipeline.String(exchangeKey, "")
+ pipeline.String(queue, "")
+ pipeline.String(routingKey, "")
+ pipeline.String(exchangeType, "direct")
+ return nil
+}
+
+func (c *Config) InitDefault() {
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "default"
+ }
+
+ if c.PrefetchCount == 0 {
+ c.PrefetchCount = 100
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:guest@localhost:5672/"
+ }
}
diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go
new file mode 100644
index 00000000..b1f9c89d
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/headers.go
@@ -0,0 +1,68 @@
+package amqp
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/streadway/amqp"
+)
+
+const (
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ rrAttempt string = "rr-attempt"
+ rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
+)
+
+// pack job metadata into headers
+func pack(id string, attempt uint64, j *structs.Job) amqp.Table {
+ return amqp.Table{
+ rrID: id,
+ rrJob: j.Job,
+ rrAttempt: attempt,
+ rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused
+ j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}}
+
+ if _, ok := d.Headers[rrID].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ }
+
+ if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ }
+
+ if _, ok := d.Headers[rrJob].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ }
+
+ j.Job = d.Headers[rrJob].(string)
+
+ if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ }
+
+ if _, ok := d.Headers[rrTimeout].(uint64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ }
+
+ if _, ok := d.Headers[rrDelay].(uint64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(uint64)
+ }
+
+ if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ }
+
+ return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 7f1bf204..4751df58 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -16,7 +16,7 @@ func From(d amqp.Delivery) *Item {
}
type Item struct {
- // Job contains name of job broker (usually PHP class).
+ // Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 174cb006..74f9a174 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -8,7 +8,7 @@ import (
)
const (
- name string = "amqp"
+ pluginName string = "amqp"
)
type Plugin struct {
@@ -23,7 +23,7 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
}
func (p *Plugin) Name() string {
- return name
+ return pluginName
}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index 41374878..7e722889 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -1,24 +1,23 @@
package amqp
import (
- "fmt"
-
- "github.com/google/uuid"
+ "github.com/spiral/errors"
"github.com/streadway/amqp"
)
-func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
+func (j *JobsConsumer) initRabbitMQ() error {
+ const op = errors.Op("rabbit_initmq")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
channel, err := j.conn.Channel()
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
err = channel.Qos(j.prefetchCount, 0, false)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// declare an exchange (idempotent operation)
@@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// verify or declare a queue
q, err := channel.QueueDeclare(
- fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()),
+ j.queue,
false,
false,
true,
@@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// bind queue to the exchange
@@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
- }
-
- // start reading messages from the channel
- deliv, err := channel.Consume(
- q.Name,
- "",
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return nil, err
+ return errors.E(op, err)
}
- return deliv, nil
+ return nil
}
func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
index bfb1fbff..874e68c4 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -2,44 +2,70 @@ package amqp
import (
"fmt"
- "time"
"github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
"github.com/streadway/amqp"
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobsConsumer) redialer() {
+func (j *JobsConsumer) redialer() { //nolint:gocognit
go func() {
+ const op = errors.Op("rabbitmq_redial")
for err := range j.conn.NotifyClose(make(chan *amqp.Error)) {
if err != nil {
+ j.Lock()
+
j.logger.Error("connection closed, reconnecting", "error", err)
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
- expb.MaxElapsedTime = time.Minute * j.retryTimeout
+ expb.MaxElapsedTime = j.retryTimeout
op := func() error {
j.logger.Warn("rabbitmq reconnecting, caused by", "error", err)
-
- j.Lock()
var dialErr error
j.conn, dialErr = amqp.Dial(j.connStr)
if dialErr != nil {
- j.Unlock()
return fmt.Errorf("fail to dial server endpoint: %v", dialErr)
}
- j.Unlock()
j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
- deliv, errInit := j.initRabbitMQ()
+ errInit := j.initRabbitMQ()
if errInit != nil {
- j.Unlock()
j.logger.Error("error while redialing", "error", errInit)
return errInit
}
+ // redeclare consume channel
+ var errConnCh error
+ j.consumeChan, errConnCh = j.conn.Channel()
+ if errConnCh != nil {
+ return errors.E(op, errConnCh)
+ }
+
+ // redeclare publish channel
+ var errPubCh error
+ j.publishChan, errPubCh = j.conn.Channel()
+ if errPubCh != nil {
+ return errors.E(op, errPubCh)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// restart listener
j.listener(deliv)
@@ -49,9 +75,12 @@ func (j *JobsConsumer) redialer() {
retryErr := backoff.Retry(op, expb)
if retryErr != nil {
+ j.Unlock()
j.logger.Error("backoff failed", "error", retryErr)
return
}
+
+ j.Unlock()
}
}
}()
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 5cf4c633..030dcae8 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -98,6 +98,11 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
return nil
}
+// Consume is no-op for the ephemeral
+func (j *JobBroker) Consume(_ *pipeline.Pipeline) error {
+ return nil
+}
+
func (j *JobBroker) Pause(pipeline string) {
if q, ok := j.queues.Load(pipeline); ok {
if q == true {
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index 56a8839d..aaed82c7 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-07-09T07:14:41.096Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.6 Safari/537.36" etag="0gh7yhPcQUpxg5xU25Ad" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc9q69tcwk96ZZGzL68csNLdt2tCQtM398sZgEdwYTG2ThP76J8mysaUDmGADWbhzG5A3+eybjlrodPR0HrmT4dfQw0FLU7ynFjpraZqKDI38oSOzdMSyzXTgLvI9ftJ8oOv/xXxQ4aNT38Nx6cQkDIPEn5QH++F4jPtJacyNovCxfNogDMpPnbh3WBro9t1AHv3pe8kwmx35zI/8i/27YSIdGrnZ+XwgHrpe+FgYQu0WOo3CMEm/jZ5OcUDhl4Emve7jgqP53CI8Tqpc0O3E5u3p15n7OdSH6t23b+376FDnk3twgyl/6avLy2sy8vnypEv+dC5uzj9942+QzDLIROF07GF6Z6WFTh6HfoK7E7dPjz4SWiBjw2QUkF8q+ToIx8lHd+QHlAy6H8mdvobjkBxwA/9uTMaiFIIngdvDQSeM/cQP6XiAB3T4AUeJT7ByIRxOwknh6DG/WS9MknBEH+sHwWkYhBGbMhoMsNnvk/E4icJ7XDjiWU5Poe8hg5RDmT4DPxWGOIjPcTjCSTQjp/CjumJpR0Z6Fad61Tb5wOOciDRTT8eGRfLRHU68nHDv8vvPEUu+cNzCeP41Hj/9q5LrHv4mY+v8O/5P6R9qEgaxR0id/wyjZBjehWM3aM9HT8o4np9zEVK4M8z+xkky43zrTpOwjHcCxWj2i15PIMJ/3haPnT3xm6e/ZvzXQnpZiJ84nEZ9vOz9OQASN7rDyZIT+Q0pdJaiO8KBm/gPZUEBoY1f2gl9MuecTBxbK9GIpdgZ1WQ3SafKryvytXgryyrfSpNulYJHutVxFLmzwmkTekIs0Vr+1s8nP7QT8nvykwL1kV+3GbmR73Paoz9mRULcD5JVK1KsviWKFchMd6pSbF1ElMnigqrqkDsph1RNTeMhuSn9c+Im/eGGCqusOTwD254OaQ5b6yHT5PgvjCvs8yy6qK5qbAEjhpFjpKhoMoOkqGh0uyE9o75z+vqcrlfVTea2WN08sh3TMFRL01VkOSUyQ0aDjH/5t4O+DNH/HpxDEyXnXYWYnxnj74imihSV0xdMU6LFafcxbHH2bEM35hangNBmqSg1heonGkUwQ1StIpksNkPgJ1mmoIksnT9pkYkkXjGf25xS01nUqrB0SWGdYzqriT/BgT/GtSoprBI1ZUHE5pgWcnelpBxVQBaopFQNUFIINaSkDAkvLc0MqKc5KeHD/DOlzvnJiNCtT1zLY3JUmTyRfxlslHT8kLmh9JheOEbAkxxy/5Ye6xMQ4qh82MP9MHJTZ5aeQ5EfpYSRPZp8u+N/2QR72cDnsEfActxNomk/mUY4O4EApCdeRMYm4tgwEkee9e6ph04P2vSgOG0yS3bFGbt35I/v8hfJn99xZ0HoegvOg+bexEw/eZhJjOVzvZxQbMX5eflvaLICc4txijR6kYVAeKgjJGcNAhYmovzNmZMrKVVbxqyLRcPAoP9BosFkH1k0pJ96JEDO27PMIEUy/5sA/6ua3pAAMF+BAMhIb+9lQCfXdjtl8E7kh5GfzPJpTImJQINwIo+fETtotmCy87OOkwSPJslcFDCDY7uvdEWZoTzdwix6Ena3Na9rf4TDafKcSb3LTFBmIlupKDNRUzIzs+PePfs1fLLMY1/t2tv74Npr5tZjerIqTn2kPtFt0xFTmAdeRF44+sDe/s6PiRrF1FwchPRoMsQll0rpzcq/x+7oTXhatlNOKSAN9LQsQGqYjQkN511orC807IpCI1MSuxUaapPxwKUAKgiNr5g8kQz9Zg5nmLtpjz4hfEFuFGQDP/GIoo3fIRwHFGTUuJvJN6xRjAwGA21BPtrsmcbOxIhRDthoKphV0CExojQkRjS5SoFmeg4oJlvaaYomqh+IJCCPJ/+fUJBGteLLc7E9APFl9m3cG+yJ2FcVrarYN8ym8CWn6hYghLrBZahnxj33l+X6jpHveamewLH/1+2xW1Eo87gxua9x0jLO6L2Iaoi5HyAhbhyOccO4cRyxEkQBcGMYgB1vNYUaTUKNhBTiu1H5eJpXVSkybZP/PtLHntxFrufj+bEMrGXmyU8HceC58TDnykXVPnldz3KnMKcboQBJ9AczG4Q+0o0n6YsO/Cc6j5SUcNR+wClFMeoZuhN6wejpjla7HbmPsX40jdmzGiUiZJeIyLQhBrdsmYisGrK8f82B94jsj/r96Pj2fPrUsZx/s3KOncTPgMCYO6IEEMi/GBIHVL5nw+VLypE1KTSRB9FOC85IjwmyDzuKtYGgkqIyy+DIzpKjY1SdprbOPzGLJ8ZHn1OdiqOIOlo1h5CaepErbukdFKy8f7LvR1ks8uW9FyfB1/ZaFwRZB3Taqc6WX+nFvMkWwq6NMb9L1FiRstKg94cX8wJXWOSNpW8gmDuZYo+YDVC0G+TIMuQyLFT+ex1SNiw5pIwgL0G1a8jDg2bEe3BoaXAIhJkhx4bA82qPJ2+EabnionPT/XdD97xBm1s1BW5xdCgDA5VWohqCIMswWoDhn6PftAqE2MZDl7hqwT7DUxXgiRwAnmAVUA3wjK/Oesrv+98/rfvvXffX6U/z+iYrAnhB0kctyp6CKGpE+lw5Nxex8dDt2sF/t7+02P3pX0A16SBsm6lJX7dQUHWssuusGrZVpBv5CjMrTnn+FaZmCbS5WWnhMkQUhMG34+vuhgLgWWku2PJZEAitQ5Q4mgjwrPK0KEpUQJToTYkSe5eiRC0KkhUFynNDZpuiBITZzkTJRph+eSbrVpXGRphG+6E0NFG+IHt5dTmRQBtfYaj1Kg0YEcr+UK/6gql3mUbeO+q1neUGjEyL619Rt8kDI2K3y4COrNdCvuB52p6QryESo76CGJXNr7DQNshXstgLFcnfb9o37Vqtd9NVlMEAst6RpSinp7KRPmCfpgMBsuyArHcoENCY9b5Hha0VxcpWw5DLxMULs941iQe/tr9eXt3Wynkvxm+21F37zcb+cN5yb6oBDkK74qCaQmm6uiIwpouSXt+GjyOXphx//d55mwyuO7tmcHkFdAPdpYQyTVtRLARh59hQFB0oZduO4YPsKoYPtKanMeyosj78NJoQkUqx0R9iYN3E667XVFVUTlOpKGtoUkr7WTKS6ijYhJEkSzQCkYDWKpC3jcIH38ORXJn+yhFlKkJlrWEC+URkNoMosJWe7OCpdF3BHFkuuweXfAXMLaob8vwHcSieuONsTGN3T2/6aewnPkHw33QFA7UXCDApwHzG0Om6B/5IOpUxHYgx+8OLXVu8fD5dUNX5XqidKT62MAxMcMWcEZ1zhy0N9fMpe2VwFGcxcietfBHHPZ7FKeLYYZ/OmC7uitnd8mv64Xjg35EvB+7ozyRdHtDD7jhO3OA+/YmT/occCgxh9B6HDFt5pWexeHQdQFRmRKG74bbZUNCCH9mnJvaUopCO4kjs6TiAHNVrWIoGsqes68qmSLd99aP9VhWfZuqi4kOAPLXQNuXpjuO9m8dl5pGYJuIyINAArxI8jyNt+3GZZbMu8Cbnxo38glVdZ3PGfW7brs1YLkuzzN01oE0sXBLVFMvtUUnUi+E4oO3aMhrfE46THfN9yEZs206ReRBlfXm3kY1YJpmLmLkE4lfDcNSbxmsjgkY6+gqEiMFg1WJVSW7WgQGxP5xqGLIUtLeJALnOlrz9+nKxusGfL2wk7tgx7VpP7znB43SEy037eTh7bib6+fLQqlkeckLJ3OsNWxkgVTHE9uyGJayDrqlPJVLmy33nz1peGQRdo9u6QNSbxc1BdDgS1c+wHFWqk+zTZbtN0vyaxkHzNJ/F3rdM9JpjmCJRmZbgaNfXswN+9Xfbcn26yuhlNWEZe2VdZvMuSJMfxxcb25SVDZMlfdoXdHZvxJg07YqlLXU0bYMRIUe9OiENqRJZyxtPUppIphHrljMK4zSrkLsC6YVvLRimSVu4GJocDkMqtIVLY+EwOUjy0/WTN4ojpDhqFRzZW0WR7FX/SHGj+GmSwsPExvFY2oXmIeTcjz+g82dt6zBLjByffslG+jT3oLgeHWaIX5E6AtZnL162XsxsFBIw+fxfeTKkDrHhSPJflZMemgo0amks6ZFtwPZuda1jdQFrrOEgWu37cWwmfuRV1j8vr760r1ilzeXlxbbMLyDmZIO+HDKRg7z6zC9RI1iOWtEAM5rCiRzOa/9qn95c01TjwZ8jL2TdT4rp62kUtVjePvjwxpS65piSn2plWxUVZWhTHdNgFMoBwTPWkJaq9C6OHli/kbeYNUa2JWaNjZwHiyxnADqvOXzt0RrNfaznh6sjqkYaUDP1yFLMStcUXTLwUbVAbV0xK6DtaMr5rbxcZDcKdSsJal3VJQzomcmzsxR1TtEvlrt3YNFqWkXurr+j9GbIBsJXUpqa93Ic9+IJ5HJulLZWXJqfBBhNO7PM5VmIRUXmDXEmMoFC5a0mrjU5hfSGA1R6vu93rr2AfY+2Gp5CcgSx48YxCzOxTRNo4+y8GPiNYUtc3oTyTQXLlqy6TYTtUc6sYreR3es6VLkga79yZkgOHtPmqJhFBrK29ulOKIe0XbU/8Ptk5M8UA7K1ITvU82mPzLQd9yOOE1ljVmiS35ACNHSgYBlSgHXsUgCHA99usHX9DbM3YPCq4Vnk7BeDA00wrz5dXn26vp0bs0qDfTFXuZUVrd0tGrWWUrEiWq9hJwsYaXL89gr3MSEWj4tkbtymq42OzvCDn668gvN7bMuqz5e0tfHBCI/CaPYhPZUdcMcUIP1wTMHGDKaGk3yE19gt8g0k09f58J7lW2wpquI63szxKUaooUbKjWX5kByhpv38W1lKISNRvo6uUN5RyPROVtV/v3oPQPDXrOwmxZIPe6v2v+xRs1pwZRCFoxxxdKVmo/qCM62gLXqq5w3ASnJVsZCDm9MKjmocaVUVg7Q3WX3m3uJdUChIM+n4OW3R/OizO/YYxi47nfZZq5T244sRJ8GUNtCfy9nird4aSyJDFzCPZJbUsv2vtsKSuuylVd8RId3YQJn/u4u9bD4lmMqOg4K8Zz5kYzvawFs4pEkKZSl40hFpp/YzljJ/7i7SdU8nC0izSZX3lN7dpE5Cb1bYVqU3S56xkUrdkzolsoLcks3roDgxtiSfblvzjB1H6p7kcf/+YINtdOqezjd39XwEpdDYdiYr05I73d8EmYL9Bu6DqCmgtmhMXcjbG3NEihbbq13PZ1ugtbbVJX267MW/r+lrVQ+56cA25pd/O+jLEP3vwTk0UXLeVS5uziu3Fah7UZ/Ytsl2BKu/tiV9opeYPWnxgj7hCiKBHIGgG1jOp8shgff1fK+F3jXHsAWaEpMaNdE7eZIgzrOBhd2V5bnxAspG6T3Lyxb7PsbMchLLQ0QHKV1VwfzycXjIFkcNWDA2DmlBJ0+5vbUGa5olilQHKsTVgEJc1FQCxZB978w8fkdqJRNZsUWkIiB2vV2kAjFOITJBHxvcyfba7pFVB6PZitgAY14Ev6pvax1hrqf48Xr64+EPunBubi87ivX552jJBs+Lkk7rbZJaTCqxLonZtr1F3lQm/sRnG3rGrfXyUU1NNW25mLYITOe37nq4pmbGGit2Eybp+vkED7yQSUCqnz0c4GTtzF5T89VZI0gGv8J85zHJvZikkU2SbypbCJkeeHjgTgMKWMqYNILFtoKncB6Qs719AbR5tMgEqju3+4KELlJtsXzWVqFokQEIXaeGKAXoTGiS0G2nsSKFrQqmEQslnvaHXPKkRLmn6JFwAWBsIXoss6wSdR3QhzqAGq0p1EA9mPNGsbRvLFvf7TLXLlVkrD5ENkwLW1TzbPy8rG8NZG6njqIGVKqKVcYlMqCwLNj3vI6dX0FsbpLDq7qruSixOfuVDnu4H/IuHOwcmrSP+Kb3i3aXz9NOOSE1k7Vb8c7z3dptKG1B+zYbJzxFZ5zNc5fWScs6q5TGgMNLAgPIGQ3qP/Fwk6ot86eWxNQ3zWNsxjHiuoEFeQygFa+h1lHsALJM5mns/xani3bYmEcRW8+ImgsRvpVhRRCIQOEqDGwbppBNw4ZiKBqZxpGCdN3WLMM2NUXwJutbXbkMGgUpzDtxU9XKKhJDIEKzVmnTggzXZvzpKAIUUcVapDoKz0FIyknG0rJ093VaiaqJxNWpkKR0tmknynnGM9i0W5dwn1tgvxGEpY5mxNnVq9F6Y7Zb1uRk/xXRIoWjVdkFdPEeUbVoIruiJjI2VEQboVpOYbYntMtG5Abr8tMikDaiIExBQWiwywOzTXMWnP7SGWcP+MZ5CXwjZ1TALfD2mWUQcWWqlnhrVnM8s9MFuHXwjLUHTAN1sd0/rgF61p60j791r48vvrwo3tGBBvow4zTGNrLm5vuU0cUQuXu3z1Clbl5ZIunKkalVA6xuH1nrw5b8jEIaRZs72QQEw6+hh+kZ/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 9d68a95a..2eb35f14 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -117,11 +117,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// pipeline name (ie test-local, sqs-aws, etc)
name := key.(string)
- // skip pipelines which are not initialized to consume
- if _, ok := p.consume[name]; !ok {
- return true
- }
-
// pipeline associated with the name
pipe := value.(*pipeline.Pipeline)
// driver for the pipeline (ie amqp, ephemeral, etc)
@@ -149,6 +144,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
return false
}
+
+ // if pipeline initialized to be consumed, call Consume on it
+ if _, ok := p.consume[name]; ok {
+ err = initializedDriver.Consume(pipe)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
+ return true
+ }
}
return true
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 1648fa6c..37fce8e2 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -41,16 +41,26 @@ jobs:
test-local:
driver: ephemeral
priority: 10
- pipeline_size: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
test-1:
driver: amqp
priority: 1
+ pipeline_size: 1000000
queue: test-1-queue
exchange: default
exchange_type: direct
routing_key: test
- pipeline_size: 1000000
test-2:
driver: beanstalk
@@ -66,5 +76,5 @@ jobs:
queue: default
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local" ]
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1" ]
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 754f60bc..b2f05f0f 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -11,6 +11,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/amqp"
"github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -34,6 +35,7 @@ func TestJobsInit(t *testing.T) {
&logger.ZapLogger{},
&jobs.Plugin{},
&ephemeral.Plugin{},
+ &amqp.Plugin{},
)
assert.NoError(t, err)
@@ -82,7 +84,7 @@ func TestJobsInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 60)
+ time.Sleep(time.Second * 120)
stopCh <- struct{}{}