summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
committerValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
commit4566f88004e81d3229222d82614c15346ac2e47d (patch)
tree05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a /plugins/jobs/brokers/amqp/consumer.go
parent5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff)
AMQP update...
Add redialer, consumer, rabbit queues initializer. Update configuration (.rr.yaml). Add ack/nack for the jobs main plugin with error handling. Add Qos, queues bining and declaration. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go79
1 files changed, 69 insertions, 10 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 0b8a5a5b..f91b71e7 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -1,12 +1,17 @@
package amqp
import (
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/streadway/amqp"
)
type Config struct {
@@ -15,35 +20,89 @@ type Config struct {
}
type JobsConsumer struct {
- log logger.Logger
- pq priorityqueue.Queue
+ sync.RWMutex
+ logger logger.Logger
+ pq priorityqueue.Queue
+
+ pipelines sync.Map
+
+ // amqp connection
+ conn *amqp.Connection
+ retryTimeout time.Duration
+ prefetchCount int
+ exchangeName string
+ connStr string
+ exchangeType string
+ routingKey string
+
+ stop chan struct{}
}
func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp name
+ // second part - queues and other pipeline information
jb := &JobsConsumer{
- log: log,
- pq: pq,
+ logger: log,
+ pq: pq,
}
+ d, err := jb.initRabbitMQ()
+ if err != nil {
+ return nil, err
+ }
+
+ // run listener
+ jb.listener(d)
+
+ // run redialer
+ jb.redialer()
+
return jb, nil
}
-func (j JobsConsumer) Push(job *structs.Job) error {
- panic("implement me")
+func (j *JobsConsumer) Push(job *structs.Job) error {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.pipelines.Load(job.Options.Pipeline); ok {
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ }
+
+ // handle timeouts
+ if job.Options.Timeout > 0 {
+ go func(jj *structs.Job) {
+ time.Sleep(jj.Options.TimeoutDuration())
+
+ // TODO push
+
+ // send the item after timeout expired
+ }(job)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
-func (j JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
panic("implement me")
}
-func (j JobsConsumer) List() []*pipeline.Pipeline {
+func (j *JobsConsumer) List() []*pipeline.Pipeline {
panic("implement me")
}
-func (j JobsConsumer) Pause(pipeline string) {
+func (j *JobsConsumer) Pause(pipeline string) {
panic("implement me")
}
-func (j JobsConsumer) Resume(pipeline string) {
+func (j *JobsConsumer) Resume(pipeline string) {
panic("implement me")
}