diff options
author | Valery Piashchynski <[email protected]> | 2021-07-10 01:18:56 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-10 01:18:56 +0300 |
commit | 4fcb5979fad87f6e268f5b9df91ee2ee91e9ef16 (patch) | |
tree | 30ed85120f8a39fd07756af9f5ce3422cf318971 | |
parent | 4566f88004e81d3229222d82614c15346ac2e47d (diff) |
AMQP job driver...
Update main driver's interface, add Consume(*pipeline) method. Implement
it on the amqp and ephemeral drivers.
Fix error with incorrect order of Register <-> Consume method calls.
Implement rabbitMQ driver, add timeouts, dead-letter-exchange,
packing-unpacking of the amqp messages.
Implement AMQP redialer in case of network error as well as channels
re-creation.
Update drawio diagram.
Update .rr.yaml jobs configuration, add all amqp options. Implement
Ack/Nack.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | common/jobs/interface.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 282 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/headers.go | 68 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 35 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 47 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 5 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 15 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 16 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 4 |
12 files changed, 410 insertions, 71 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go index deb90cde..3c29447d 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -10,6 +10,7 @@ import ( type Consumer interface { Push(job *structs.Job) error Register(pipeline *pipeline.Pipeline) error + Consume(pipeline *pipeline.Pipeline) error List() []*pipeline.Pipeline Pause(pipeline string) diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index f91b71e7..9ac47269 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -1,9 +1,11 @@ package amqp import ( + "fmt" "sync" "time" + "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" @@ -14,9 +16,32 @@ import ( "github.com/streadway/amqp" ) +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration type Config struct { - Addr string - Queue string + PrefetchCount int `mapstructure:"pipeline_size"` + Queue string `mapstructure:"queue"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` } type JobsConsumer struct { @@ -27,35 +52,91 @@ type JobsConsumer struct { pipelines sync.Map // amqp connection - conn *amqp.Connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan *amqp.Channel + retryTimeout time.Duration prefetchCount int exchangeName string + queue string + consumeID string connStr string exchangeType string routingKey string + // TODO send data to channel stop chan struct{} } func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { + const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. - // firs part - address to connect, it is located in the global section under the amqp name + // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - logger: log, - pq: pq, + logger: log, + pq: pq, + consumeID: uuid.NewString(), + stop: make(chan struct{}), + retryTimeout: time.Minute * 5, + } + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - d, err := jb.initRabbitMQ() + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(configKey, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeCfg.RoutingKey + jb.queue = pipeCfg.Queue + jb.exchangeType = pipeCfg.ExchangeType + jb.exchangeName = pipeCfg.Exchange + jb.prefetchCount = pipeCfg.PrefetchCount + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // assign address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() if err != nil { return nil, err } - // run listener - jb.listener(d) + jb.publishChan, err = jb.conn.Channel() + if err != nil { + panic(err) + } - // run redialer + // run redialer for the connection jb.redialer() return jb, nil @@ -63,27 +144,57 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") + j.RLock() + defer j.RUnlock() // check if the pipeline registered - if b, ok := j.pipelines.Load(job.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) - } - + if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { // handle timeouts - if job.Options.Timeout > 0 { - go func(jj *structs.Job) { - time.Sleep(jj.Options.TimeoutDuration()) + if job.Options.DelayDuration() > 0 { + // pub + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - // TODO push + _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: 100, + dlxExpires: 200, + }) - // send the item after timeout expired - }(job) + if err != nil { + panic(err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + panic(err) + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } // insert to the local, limited pipeline + err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + //Headers: pack(job.Ident, 0, job), + ContentType: contentType, + Timestamp: time.Now(), + Body: nil, + }) + if err != nil { + panic(err) + } return nil } @@ -92,7 +203,52 @@ func (j *JobsConsumer) Push(job *structs.Job) error { } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { - panic("implement me") + const op = errors.Op("rabbitmq_register") + if _, ok := j.pipelines.Load(pipeline.Name()); ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) + } + + j.pipelines.Store(pipeline.Name(), true) + + return nil +} + +func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + if _, ok := j.pipelines.Load(pipeline.Name()); !ok { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) + } + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + return nil } func (j *JobsConsumer) List() []*pipeline.Pipeline { @@ -100,9 +256,87 @@ func (j *JobsConsumer) List() []*pipeline.Pipeline { } func (j *JobsConsumer) Pause(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.pipelines.Store(pipeline, false) + } + } + + err := j.publishChan.Cancel(j.consumeID, true) + if err != nil { + j.logger.Error("cancel publish channel, forcing close", "error", err) + errCl := j.publishChan.Close() + if errCl != nil { + j.logger.Error("force close failed", "error", err) + } + } } func (j *JobsConsumer) Resume(pipeline string) { - panic("implement me") + if q, ok := j.pipelines.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.pipelines.Store(pipeline, true) + } + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.logger.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.logger.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.logger.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) + } +} + +// Declare used to dynamically declare a pipeline +func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error { + pipeline.String(exchangeKey, "") + pipeline.String(queue, "") + pipeline.String(routingKey, "") + pipeline.String(exchangeType, "direct") + return nil +} + +func (c *Config) InitDefault() { + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "default" + } + + if c.PrefetchCount == 0 { + c.PrefetchCount = 100 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@localhost:5672/" + } } diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go new file mode 100644 index 00000000..b1f9c89d --- /dev/null +++ b/plugins/jobs/brokers/amqp/headers.go @@ -0,0 +1,68 @@ +package amqp + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/streadway/amqp" +) + +const ( + rrID string = "rr-id" + rrJob string = "rr-job" + rrAttempt string = "rr-attempt" + rrMaxAttempts string = "rr-max_attempts" + rrTimeout string = "rr-timeout" + rrDelay string = "rr-delay" + rrRetryDelay string = "rr-retry_delay" +) + +// pack job metadata into headers +func pack(id string, attempt uint64, j *structs.Job) amqp.Table { + return amqp.Table{ + rrID: id, + rrJob: j.Job, + rrAttempt: attempt, + rrMaxAttempts: j.Options.Attempts, + rrTimeout: j.Options.Timeout, + rrDelay: j.Options.Delay, + rrRetryDelay: j.Options.RetryDelay, + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused + j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}} + + if _, ok := d.Headers[rrID].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrID) + } + + if _, ok := d.Headers[rrAttempt].(uint64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) + } + + if _, ok := d.Headers[rrJob].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob) + } + + j.Job = d.Headers[rrJob].(string) + + if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { + j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) + } + + if _, ok := d.Headers[rrTimeout].(uint64); ok { + j.Options.Timeout = d.Headers[rrTimeout].(uint64) + } + + if _, ok := d.Headers[rrDelay].(uint64); ok { + j.Options.Delay = d.Headers[rrDelay].(uint64) + } + + if _, ok := d.Headers[rrRetryDelay].(uint64); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64) + } + + return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil +} diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 7f1bf204..4751df58 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -16,7 +16,7 @@ func From(d amqp.Delivery) *Item { } type Item struct { - // Job contains name of job broker (usually PHP class). + // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` // Ident is unique identifier of the job, should be provided from outside diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 174cb006..74f9a174 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -8,7 +8,7 @@ import ( ) const ( - name string = "amqp" + pluginName string = "amqp" ) type Plugin struct { @@ -23,7 +23,7 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { } func (p *Plugin) Name() string { - return name + return pluginName } func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index 41374878..7e722889 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -1,24 +1,23 @@ package amqp import ( - "fmt" - - "github.com/google/uuid" + "github.com/spiral/errors" "github.com/streadway/amqp" ) -func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("rabbit_initmq") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. channel, err := j.conn.Channel() if err != nil { - return nil, err + return errors.E(op, err) } err = channel.Qos(j.prefetchCount, 0, false) if err != nil { - return nil, err + return errors.E(op, err) } // declare an exchange (idempotent operation) @@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // verify or declare a queue q, err := channel.QueueDeclare( - fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()), + j.queue, false, false, true, @@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // bind queue to the exchange @@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err - } - - // start reading messages from the channel - deliv, err := channel.Consume( - q.Name, - "", - false, - false, - false, - false, - nil, - ) - if err != nil { - return nil, err + return errors.E(op, err) } - return deliv, nil + return nil } func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index bfb1fbff..874e68c4 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -2,44 +2,70 @@ package amqp import ( "fmt" - "time" "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" "github.com/streadway/amqp" ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobsConsumer) redialer() { +func (j *JobsConsumer) redialer() { //nolint:gocognit go func() { + const op = errors.Op("rabbitmq_redial") for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { if err != nil { + j.Lock() + j.logger.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = time.Minute * j.retryTimeout + expb.MaxElapsedTime = j.retryTimeout op := func() error { j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) - - j.Lock() var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) if dialErr != nil { - j.Unlock() return fmt.Errorf("fail to dial server endpoint: %v", dialErr) } - j.Unlock() j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - deliv, errInit := j.initRabbitMQ() + errInit := j.initRabbitMQ() if errInit != nil { - j.Unlock() j.logger.Error("error while redialing", "error", errInit) return errInit } + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + var errPubCh error + j.publishChan, errPubCh = j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + // restart listener j.listener(deliv) @@ -49,9 +75,12 @@ func (j *JobsConsumer) redialer() { retryErr := backoff.Retry(op, expb) if retryErr != nil { + j.Unlock() j.logger.Error("backoff failed", "error", retryErr) return } + + j.Unlock() } } }() diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 5cf4c633..030dcae8 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -98,6 +98,11 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } +// Consume is no-op for the ephemeral +func (j *JobBroker) Consume(_ *pipeline.Pipeline) error { + return nil +} + func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index 56a8839d..aaed82c7 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-07-09T07:14:41.096Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.6 Safari/537.36" etag="0gh7yhPcQUpxg5xU25Ad" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc9q69tcwk96ZZGzL68csNLdt2tCQtM398sZgEdwYTG2ThP76J8mysaUDmGADWbhzG5A3+eybjlrodPR0HrmT4dfQw0FLU7ynFjpraZqKDI38oSOzdMSyzXTgLvI9ftJ8oOv/xXxQ4aNT38Nx6cQkDIPEn5QH++F4jPtJacyNovCxfNogDMpPnbh3WBro9t1AHv3pe8kwmx35zI/8i/27YSIdGrnZ+XwgHrpe+FgYQu0WOo3CMEm/jZ5OcUDhl4Emve7jgqP53CI8Tqpc0O3E5u3p15n7OdSH6t23b+376FDnk3twgyl/6avLy2sy8vnypEv+dC5uzj9942+QzDLIROF07GF6Z6WFTh6HfoK7E7dPjz4SWiBjw2QUkF8q+ToIx8lHd+QHlAy6H8mdvobjkBxwA/9uTMaiFIIngdvDQSeM/cQP6XiAB3T4AUeJT7ByIRxOwknh6DG/WS9MknBEH+sHwWkYhBGbMhoMsNnvk/E4icJ7XDjiWU5Poe8hg5RDmT4DPxWGOIjPcTjCSTQjp/CjumJpR0Z6Fad61Tb5wOOciDRTT8eGRfLRHU68nHDv8vvPEUu+cNzCeP41Hj/9q5LrHv4mY+v8O/5P6R9qEgaxR0id/wyjZBjehWM3aM9HT8o4np9zEVK4M8z+xkky43zrTpOwjHcCxWj2i15PIMJ/3haPnT3xm6e/ZvzXQnpZiJ84nEZ9vOz9OQASN7rDyZIT+Q0pdJaiO8KBm/gPZUEBoY1f2gl9MuecTBxbK9GIpdgZ1WQ3SafKryvytXgryyrfSpNulYJHutVxFLmzwmkTekIs0Vr+1s8nP7QT8nvykwL1kV+3GbmR73Paoz9mRULcD5JVK1KsviWKFchMd6pSbF1ElMnigqrqkDsph1RNTeMhuSn9c+Im/eGGCqusOTwD254OaQ5b6yHT5PgvjCvs8yy6qK5qbAEjhpFjpKhoMoOkqGh0uyE9o75z+vqcrlfVTea2WN08sh3TMFRL01VkOSUyQ0aDjH/5t4O+DNH/HpxDEyXnXYWYnxnj74imihSV0xdMU6LFafcxbHH2bEM35hangNBmqSg1heonGkUwQ1StIpksNkPgJ1mmoIksnT9pkYkkXjGf25xS01nUqrB0SWGdYzqriT/BgT/GtSoprBI1ZUHE5pgWcnelpBxVQBaopFQNUFIINaSkDAkvLc0MqKc5KeHD/DOlzvnJiNCtT1zLY3JUmTyRfxlslHT8kLmh9JheOEbAkxxy/5Ye6xMQ4qh82MP9MHJTZ5aeQ5EfpYSRPZp8u+N/2QR72cDnsEfActxNomk/mUY4O4EApCdeRMYm4tgwEkee9e6ph04P2vSgOG0yS3bFGbt35I/v8hfJn99xZ0HoegvOg+bexEw/eZhJjOVzvZxQbMX5eflvaLICc4txijR6kYVAeKgjJGcNAhYmovzNmZMrKVVbxqyLRcPAoP9BosFkH1k0pJ96JEDO27PMIEUy/5sA/6ua3pAAMF+BAMhIb+9lQCfXdjtl8E7kh5GfzPJpTImJQINwIo+fETtotmCy87OOkwSPJslcFDCDY7uvdEWZoTzdwix6Ena3Na9rf4TDafKcSb3LTFBmIlupKDNRUzIzs+PePfs1fLLMY1/t2tv74Npr5tZjerIqTn2kPtFt0xFTmAdeRF44+sDe/s6PiRrF1FwchPRoMsQll0rpzcq/x+7oTXhatlNOKSAN9LQsQGqYjQkN511orC807IpCI1MSuxUaapPxwKUAKgiNr5g8kQz9Zg5nmLtpjz4hfEFuFGQDP/GIoo3fIRwHFGTUuJvJN6xRjAwGA21BPtrsmcbOxIhRDthoKphV0CExojQkRjS5SoFmeg4oJlvaaYomqh+IJCCPJ/+fUJBGteLLc7E9APFl9m3cG+yJ2FcVrarYN8ym8CWn6hYghLrBZahnxj33l+X6jpHveamewLH/1+2xW1Eo87gxua9x0jLO6L2Iaoi5HyAhbhyOccO4cRyxEkQBcGMYgB1vNYUaTUKNhBTiu1H5eJpXVSkybZP/PtLHntxFrufj+bEMrGXmyU8HceC58TDnykXVPnldz3KnMKcboQBJ9AczG4Q+0o0n6YsO/Cc6j5SUcNR+wClFMeoZuhN6wejpjla7HbmPsX40jdmzGiUiZJeIyLQhBrdsmYisGrK8f82B94jsj/r96Pj2fPrUsZx/s3KOncTPgMCYO6IEEMi/GBIHVL5nw+VLypE1KTSRB9FOC85IjwmyDzuKtYGgkqIyy+DIzpKjY1SdprbOPzGLJ8ZHn1OdiqOIOlo1h5CaepErbukdFKy8f7LvR1ks8uW9FyfB1/ZaFwRZB3Taqc6WX+nFvMkWwq6NMb9L1FiRstKg94cX8wJXWOSNpW8gmDuZYo+YDVC0G+TIMuQyLFT+ex1SNiw5pIwgL0G1a8jDg2bEe3BoaXAIhJkhx4bA82qPJ2+EabnionPT/XdD97xBm1s1BW5xdCgDA5VWohqCIMswWoDhn6PftAqE2MZDl7hqwT7DUxXgiRwAnmAVUA3wjK/Oesrv+98/rfvvXffX6U/z+iYrAnhB0kctyp6CKGpE+lw5Nxex8dDt2sF/t7+02P3pX0A16SBsm6lJX7dQUHWssuusGrZVpBv5CjMrTnn+FaZmCbS5WWnhMkQUhMG34+vuhgLgWWku2PJZEAitQ5Q4mgjwrPK0KEpUQJToTYkSe5eiRC0KkhUFynNDZpuiBITZzkTJRph+eSbrVpXGRphG+6E0NFG+IHt5dTmRQBtfYaj1Kg0YEcr+UK/6gql3mUbeO+q1neUGjEyL619Rt8kDI2K3y4COrNdCvuB52p6QryESo76CGJXNr7DQNshXstgLFcnfb9o37Vqtd9NVlMEAst6RpSinp7KRPmCfpgMBsuyArHcoENCY9b5Hha0VxcpWw5DLxMULs941iQe/tr9eXt3Wynkvxm+21F37zcb+cN5yb6oBDkK74qCaQmm6uiIwpouSXt+GjyOXphx//d55mwyuO7tmcHkFdAPdpYQyTVtRLARh59hQFB0oZduO4YPsKoYPtKanMeyosj78NJoQkUqx0R9iYN3E667XVFVUTlOpKGtoUkr7WTKS6ijYhJEkSzQCkYDWKpC3jcIH38ORXJn+yhFlKkJlrWEC+URkNoMosJWe7OCpdF3BHFkuuweXfAXMLaob8vwHcSieuONsTGN3T2/6aewnPkHw33QFA7UXCDApwHzG0Om6B/5IOpUxHYgx+8OLXVu8fD5dUNX5XqidKT62MAxMcMWcEZ1zhy0N9fMpe2VwFGcxcietfBHHPZ7FKeLYYZ/OmC7uitnd8mv64Xjg35EvB+7ozyRdHtDD7jhO3OA+/YmT/occCgxh9B6HDFt5pWexeHQdQFRmRKG74bbZUNCCH9mnJvaUopCO4kjs6TiAHNVrWIoGsqes68qmSLd99aP9VhWfZuqi4kOAPLXQNuXpjuO9m8dl5pGYJuIyINAArxI8jyNt+3GZZbMu8Cbnxo38glVdZ3PGfW7brs1YLkuzzN01oE0sXBLVFMvtUUnUi+E4oO3aMhrfE46THfN9yEZs206ReRBlfXm3kY1YJpmLmLkE4lfDcNSbxmsjgkY6+gqEiMFg1WJVSW7WgQGxP5xqGLIUtLeJALnOlrz9+nKxusGfL2wk7tgx7VpP7znB43SEy037eTh7bib6+fLQqlkeckLJ3OsNWxkgVTHE9uyGJayDrqlPJVLmy33nz1peGQRdo9u6QNSbxc1BdDgS1c+wHFWqk+zTZbtN0vyaxkHzNJ/F3rdM9JpjmCJRmZbgaNfXswN+9Xfbcn26yuhlNWEZe2VdZvMuSJMfxxcb25SVDZMlfdoXdHZvxJg07YqlLXU0bYMRIUe9OiENqRJZyxtPUppIphHrljMK4zSrkLsC6YVvLRimSVu4GJocDkMqtIVLY+EwOUjy0/WTN4ojpDhqFRzZW0WR7FX/SHGj+GmSwsPExvFY2oXmIeTcjz+g82dt6zBLjByffslG+jT3oLgeHWaIX5E6AtZnL162XsxsFBIw+fxfeTKkDrHhSPJflZMemgo0amks6ZFtwPZuda1jdQFrrOEgWu37cWwmfuRV1j8vr760r1ilzeXlxbbMLyDmZIO+HDKRg7z6zC9RI1iOWtEAM5rCiRzOa/9qn95c01TjwZ8jL2TdT4rp62kUtVjePvjwxpS65piSn2plWxUVZWhTHdNgFMoBwTPWkJaq9C6OHli/kbeYNUa2JWaNjZwHiyxnADqvOXzt0RrNfaznh6sjqkYaUDP1yFLMStcUXTLwUbVAbV0xK6DtaMr5rbxcZDcKdSsJal3VJQzomcmzsxR1TtEvlrt3YNFqWkXurr+j9GbIBsJXUpqa93Ic9+IJ5HJulLZWXJqfBBhNO7PM5VmIRUXmDXEmMoFC5a0mrjU5hfSGA1R6vu93rr2AfY+2Gp5CcgSx48YxCzOxTRNo4+y8GPiNYUtc3oTyTQXLlqy6TYTtUc6sYreR3es6VLkga79yZkgOHtPmqJhFBrK29ulOKIe0XbU/8Ptk5M8UA7K1ITvU82mPzLQd9yOOE1ljVmiS35ACNHSgYBlSgHXsUgCHA99usHX9DbM3YPCq4Vnk7BeDA00wrz5dXn26vp0bs0qDfTFXuZUVrd0tGrWWUrEiWq9hJwsYaXL89gr3MSEWj4tkbtymq42OzvCDn668gvN7bMuqz5e0tfHBCI/CaPYhPZUdcMcUIP1wTMHGDKaGk3yE19gt8g0k09f58J7lW2wpquI63szxKUaooUbKjWX5kByhpv38W1lKISNRvo6uUN5RyPROVtV/v3oPQPDXrOwmxZIPe6v2v+xRs1pwZRCFoxxxdKVmo/qCM62gLXqq5w3ASnJVsZCDm9MKjmocaVUVg7Q3WX3m3uJdUChIM+n4OW3R/OizO/YYxi47nfZZq5T244sRJ8GUNtCfy9nird4aSyJDFzCPZJbUsv2vtsKSuuylVd8RId3YQJn/u4u9bD4lmMqOg4K8Zz5kYzvawFs4pEkKZSl40hFpp/YzljJ/7i7SdU8nC0izSZX3lN7dpE5Cb1bYVqU3S56xkUrdkzolsoLcks3roDgxtiSfblvzjB1H6p7kcf/+YINtdOqezjd39XwEpdDYdiYr05I73d8EmYL9Bu6DqCmgtmhMXcjbG3NEihbbq13PZ1ugtbbVJX267MW/r+lrVQ+56cA25pd/O+jLEP3vwTk0UXLeVS5uziu3Fah7UZ/Ytsl2BKu/tiV9opeYPWnxgj7hCiKBHIGgG1jOp8shgff1fK+F3jXHsAWaEpMaNdE7eZIgzrOBhd2V5bnxAspG6T3Lyxb7PsbMchLLQ0QHKV1VwfzycXjIFkcNWDA2DmlBJ0+5vbUGa5olilQHKsTVgEJc1FQCxZB978w8fkdqJRNZsUWkIiB2vV2kAjFOITJBHxvcyfba7pFVB6PZitgAY14Ev6pvax1hrqf48Xr64+EPunBubi87ivX552jJBs+Lkk7rbZJaTCqxLonZtr1F3lQm/sRnG3rGrfXyUU1NNW25mLYITOe37nq4pmbGGit2Eybp+vkED7yQSUCqnz0c4GTtzF5T89VZI0gGv8J85zHJvZikkU2SbypbCJkeeHjgTgMKWMqYNILFtoKncB6Qs719AbR5tMgEqju3+4KELlJtsXzWVqFokQEIXaeGKAXoTGiS0G2nsSKFrQqmEQslnvaHXPKkRLmn6JFwAWBsIXoss6wSdR3QhzqAGq0p1EA9mPNGsbRvLFvf7TLXLlVkrD5ENkwLW1TzbPy8rG8NZG6njqIGVKqKVcYlMqCwLNj3vI6dX0FsbpLDq7qruSixOfuVDnu4H/IuHOwcmrSP+Kb3i3aXz9NOOSE1k7Vb8c7z3dptKG1B+zYbJzxFZ5zNc5fWScs6q5TGgMNLAgPIGQ3qP/Fwk6ot86eWxNQ3zWNsxjHiuoEFeQygFa+h1lHsALJM5mns/xani3bYmEcRW8+ImgsRvpVhRRCIQOEqDGwbppBNw4ZiKBqZxpGCdN3WLMM2NUXwJutbXbkMGgUpzDtxU9XKKhJDIEKzVmnTggzXZvzpKAIUUcVapDoKz0FIyknG0rJ093VaiaqJxNWpkKR0tmknynnGM9i0W5dwn1tgvxGEpY5mxNnVq9F6Y7Zb1uRk/xXRIoWjVdkFdPEeUbVoIruiJjI2VEQboVpOYbYntMtG5Abr8tMikDaiIExBQWiwywOzTXMWnP7SGWcP+MZ5CXwjZ1TALfD2mWUQcWWqlnhrVnM8s9MFuHXwjLUHTAN1sd0/rgF61p60j791r48vvrwo3tGBBvow4zTGNrLm5vuU0cUQuXu3z1Clbl5ZIunKkalVA6xuH1nrw5b8jEIaRZs72QQEw6+hh+kZ/wc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 9d68a95a..2eb35f14 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -117,11 +117,6 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) - // skip pipelines which are not initialized to consume - if _, ok := p.consume[name]; !ok { - return true - } - // pipeline associated with the name pipe := value.(*pipeline.Pipeline) // driver for the pipeline (ie amqp, ephemeral, etc) @@ -149,6 +144,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) return false } + + // if pipeline initialized to be consumed, call Consume on it + if _, ok := p.consume[name]; ok { + err = initializedDriver.Consume(pipe) + if err != nil { + errCh <- errors.E(op, err) + return false + } + return true + } } return true diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index 1648fa6c..37fce8e2 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -41,16 +41,26 @@ jobs: test-local: driver: ephemeral priority: 10 - pipeline_size: 10 + pipeline_size: 10000 + + test-local-2: + driver: ephemeral + priority: 1 + pipeline_size: 10000 + + test-local-3: + driver: ephemeral + priority: 2 + pipeline_size: 10000 test-1: driver: amqp priority: 1 + pipeline_size: 1000000 queue: test-1-queue exchange: default exchange_type: direct routing_key: test - pipeline_size: 1000000 test-2: driver: beanstalk @@ -66,5 +76,5 @@ jobs: queue: default # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local" ] + consume: [ "test-local", "test-local-2", "test-local-3", "test-1" ] diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index 754f60bc..b2f05f0f 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -11,6 +11,7 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/amqp" "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -34,6 +35,7 @@ func TestJobsInit(t *testing.T) { &logger.ZapLogger{}, &jobs.Plugin{}, &ephemeral.Plugin{}, + &amqp.Plugin{}, ) assert.NoError(t, err) @@ -82,7 +84,7 @@ func TestJobsInit(t *testing.T) { } }() - time.Sleep(time.Second * 60) + time.Sleep(time.Second * 120) stopCh <- struct{}{} |