summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/jobs/interface.go4
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rw-r--r--pkg/priority_queue/binary_heap.go2
-rw-r--r--pkg/priority_queue/binary_heap_test.go6
-rw-r--r--pkg/priority_queue/interface.go2
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go127
-rw-r--r--plugins/jobs/brokers/amqp/item.go66
-rw-r--r--plugins/jobs/brokers/amqp/listener.go25
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go27
-rw-r--r--plugins/jobs/brokers/amqp/rabbit_init.go (renamed from plugins/jobs/brokers/amqp/rabbit.go)30
-rw-r--r--plugins/jobs/brokers/amqp/redial.go37
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go42
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go9
-rw-r--r--plugins/jobs/plugin.go150
14 files changed, 328 insertions, 201 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index 9c7ffef8..426d5606 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -10,7 +10,8 @@ import (
type Consumer interface {
Push(job *structs.Job) error
Register(pipeline *pipeline.Pipeline) error
- Consume(pipeline *pipeline.Pipeline) error
+ Run(pipeline *pipeline.Pipeline) error
+ Stop() error
// List of the pipelines
List() []string
@@ -20,4 +21,5 @@ type Consumer interface {
type Constructor interface {
JobsConstruct(configKey string, queue priorityqueue.Queue) (Consumer, error)
+ FromPipeline(pipe *pipeline.Pipeline, queue priorityqueue.Queue) (Consumer, error)
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index f264c6dc..3df773ab 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -204,7 +204,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Consume pool events
+ // Run pool events
ev := make(chan struct{}, 1)
listener := func(event interface{}) {
if pe, ok := event.(events.PoolEvent); ok {
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index e47dd2c8..514ca460 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -104,7 +104,7 @@ func (bh *BinHeap) Insert(item Item) {
bh.cond.Signal()
}
-func (bh *BinHeap) GetMax() Item {
+func (bh *BinHeap) ExtractMin() Item {
bh.cond.L.Lock()
// if len == 0, wait for the signal
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index 06d0735c..f30cf8d8 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -50,7 +50,7 @@ func TestBinHeap_Init(t *testing.T) {
res := make([]Item, 0, 12)
for i := 0; i < 11; i++ {
- item := bh.GetMax()
+ item := bh.ExtractMin()
res = append(res, item)
}
@@ -83,7 +83,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-tt.C:
fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
atomic.StoreUint64(&insertsPerSec, 0)
- fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec)))
+ fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
atomic.StoreUint64(&getPerSec, 0)
case <-stopCh:
tt.Stop()
@@ -98,7 +98,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-stopCh:
return
default:
- pq.GetMax()
+ pq.ExtractMin()
atomic.AddUint64(&getPerSec, 1)
}
}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 8278dc8d..1efebf1c 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -2,7 +2,7 @@ package priorityqueue
type Queue interface {
Insert(item Item)
- GetMax() Item
+ ExtractMin() Item
Len() uint64
}
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 5b549874..22eee2dc 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -7,7 +7,6 @@ import (
"github.com/google/uuid"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -46,8 +45,8 @@ type Config struct {
type JobsConsumer struct {
sync.RWMutex
- logger logger.Logger
- pq priorityqueue.Queue
+ log logger.Logger
+ pq priorityqueue.Queue
pipelines sync.Map
@@ -67,19 +66,20 @@ type JobsConsumer struct {
delayCache map[string]struct{}
- stop chan struct{}
+ stopCh chan struct{}
}
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) {
+// NewAMQPConsumer initializes rabbitmq pipeline
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
jb := &JobsConsumer{
- logger: log,
+ log: log,
pq: pq,
consumeID: uuid.NewString(),
- stop: stopCh,
+ stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
delayCache: make(map[string]struct{}, 100),
}
@@ -144,6 +144,14 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
+func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) {
+ _ = exchangeType
+ _ = exchangeKey
+ _ = queue
+ _ = routingKey
+ panic("not implemented")
+}
+
func (j *JobsConsumer) Push(job *structs.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -157,7 +165,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
defer j.Unlock()
// convert
- msg := FromJob(job)
+ msg := fromJob(job)
p, err := pack(job.Ident, msg)
if err != nil {
return errors.E(op, err)
@@ -245,12 +253,12 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.pipelines.Store(pipeline.Name(), true)
+ j.pipelines.Store(pipeline.Name(), struct{}{})
return nil
}
-func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
+func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
if _, ok := j.pipelines.Load(pipeline.Name()); !ok {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name()))
@@ -304,77 +312,74 @@ func (j *JobsConsumer) List() []string {
}
func (j *JobsConsumer) Pause(pipeline string) {
- if q, ok := j.pipelines.Load(pipeline); ok {
- if q == true {
- // mark pipeline as turned off
- j.pipelines.Store(pipeline, false)
- }
+ if _, ok := j.pipelines.Load(pipeline); !ok {
+ j.log.Error("no such pipeline", "requested pause on", pipeline)
+ return
}
// protect connection (redial)
j.Lock()
defer j.Unlock()
- err := j.publishChan.Cancel(j.consumeID, true)
+ err := j.consumeChan.Cancel(j.consumeID, true)
if err != nil {
- j.logger.Error("cancel publish channel, forcing close", "error", err)
- errCl := j.publishChan.Close()
+ j.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := j.consumeChan.Close()
if errCl != nil {
- j.logger.Error("force close failed", "error", err)
+ j.log.Error("force close failed", "error", err)
}
}
}
func (j *JobsConsumer) Resume(pipeline string) {
- if q, ok := j.pipelines.Load(pipeline); ok {
- if q == false {
- // mark pipeline as turned off
- j.pipelines.Store(pipeline, true)
- }
-
- // protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ if _, ok := j.pipelines.Load(pipeline); !ok {
+ j.log.Error("no such pipeline", "requested pause on", pipeline)
+ return
+ }
- var err error
- j.consumeChan, err = j.conn.Channel()
- if err != nil {
- j.logger.Error("create channel on rabbitmq connection", "error", err)
- return
- }
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
- if err != nil {
- j.logger.Error("qos set failed", "error", err)
- return
- }
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ j.log.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
- // start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- j.logger.Error("consume operation failed", "error", err)
- return
- }
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ j.log.Error("qos set failed", "error", err)
+ return
+ }
- // run listener
- j.listener(deliv)
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ j.log.Error("consume operation failed", "error", err)
+ return
}
+
+ // run listener
+ j.listener(deliv)
}
-// Declare used to dynamically declare a pipeline
-func (j *JobsConsumer) Declare(pipeline *pipeline.Pipeline) error {
- pipeline.String(exchangeKey, "")
- pipeline.String(queue, "")
- pipeline.String(routingKey, "")
- pipeline.String(exchangeType, "direct")
+func (j *JobsConsumer) Stop() error {
+ j.stopCh <- struct{}{}
+ j.pipelines.Range(func(key, _ interface{}) bool {
+ j.pipelines.Delete(key)
+ return true
+ })
+
return nil
}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 2e8a30af..a46f1ca2 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -20,39 +20,6 @@ const (
rrRetryDelay string = "rr_retry_delay"
)
-func FromDelivery(d amqp.Delivery) (*Item, error) {
- const op = errors.Op("from_delivery_convert")
- item, err := unpack(d)
- if err != nil {
- return nil, errors.E(op, err)
- }
- return &Item{
- Job: item.Job,
- Ident: item.Ident,
- Payload: item.Payload,
- Headers: item.Headers,
- Options: item.Options,
- AckFunc: d.Ack,
- NackFunc: d.Nack,
- }, nil
-}
-
-func FromJob(job *structs.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: &Options{
- Priority: uint32(job.Options.Priority),
- Pipeline: job.Options.Pipeline,
- Delay: int32(job.Options.Delay),
- Attempts: int32(job.Options.Attempts),
- RetryDelay: int32(job.Options.RetryDelay),
- Timeout: int32(job.Options.Timeout),
- },
- }
-}
-
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
@@ -154,6 +121,39 @@ func (j *Item) Nack() error {
return j.NackFunc(false, false)
}
+func fromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ item, err := unpack(d)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ return &Item{
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ AckFunc: d.Ack,
+ NackFunc: d.Nack,
+ }, nil
+}
+
+func fromJob(job *structs.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: uint32(job.Options.Priority),
+ Pipeline: job.Options.Pipeline,
+ Delay: int32(job.Options.Delay),
+ Attempts: int32(job.Options.Attempts),
+ RetryDelay: int32(job.Options.RetryDelay),
+ Timeout: int32(job.Options.Timeout),
+ },
+ }
+}
+
// pack job metadata into headers
func pack(id string, j *Item) (amqp.Table, error) {
headers, err := json.Marshal(j.Headers)
diff --git a/plugins/jobs/brokers/amqp/listener.go b/plugins/jobs/brokers/amqp/listener.go
new file mode 100644
index 00000000..2b994fc5
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/listener.go
@@ -0,0 +1,25 @@
+package amqp
+
+import "github.com/streadway/amqp"
+
+func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case msg, ok := <-deliv:
+ if !ok {
+ j.log.Info("delivery channel closed, leaving the rabbit listener")
+ return
+ }
+
+ d, err := fromDelivery(msg)
+ if err != nil {
+ j.log.Error("amqp delivery convert", "error", err)
+ continue
+ }
+ // insert job into the main priority queue
+ j.pq.Insert(d)
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 7b6562c7..6743dc2f 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -1,11 +1,10 @@
package amqp
import (
- "sync/atomic"
-
"github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -16,27 +15,11 @@ const (
type Plugin struct {
log logger.Logger
cfg config.Configurer
-
- numConsumers uint32
- stopCh chan struct{}
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
p.cfg = cfg
- p.stopCh = make(chan struct{})
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- return make(chan error)
-}
-
-func (p *Plugin) Stop() error {
- // send stop to the all consumers delivery
- for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ {
- p.stopCh <- struct{}{}
- }
return nil
}
@@ -47,6 +30,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
- atomic.AddUint32(&p.numConsumers, 1)
- return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq)
+ return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
+}
+
+// FromPipeline constructs AMQP driver from pipeline
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, pq)
}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit_init.go
index b5e73a9d..cb9f2dc4 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit_init.go
@@ -2,7 +2,6 @@ package amqp
import (
"github.com/spiral/errors"
- "github.com/streadway/amqp"
)
func (j *JobsConsumer) initRabbitMQ() error {
@@ -15,11 +14,6 @@ func (j *JobsConsumer) initRabbitMQ() error {
return errors.E(op, err)
}
- err = channel.Qos(j.prefetchCount, 0, false)
- if err != nil {
- return errors.E(op, err)
- }
-
// declare an exchange (idempotent operation)
err = channel.ExchangeDeclare(
j.exchangeName,
@@ -61,27 +55,3 @@ func (j *JobsConsumer) initRabbitMQ() error {
return channel.Close()
}
-
-func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
- go func() {
- for {
- select {
- case msg, ok := <-deliv:
- if !ok {
- j.logger.Info("delivery channel closed, leaving the rabbit listener")
- return
- }
-
- d, err := FromDelivery(msg)
- if err != nil {
- j.logger.Error("amqp delivery convert", "error", err)
- continue
- }
- // insert job into the main priority queue
- j.pq.Insert(d)
- case <-j.stop:
- return
- }
- }
- }()
-}
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
index 874e68c4..16071b78 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -12,29 +12,34 @@ import (
func (j *JobsConsumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
- for err := range j.conn.NotifyClose(make(chan *amqp.Error)) {
- if err != nil {
- j.Lock()
- j.logger.Error("connection closed, reconnecting", "error", err)
+ for {
+ select {
+ case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ if err == nil {
+ return
+ }
+
+ j.Lock()
+ j.log.Error("connection closed, reconnecting", "error", err)
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
expb.MaxElapsedTime = j.retryTimeout
op := func() error {
- j.logger.Warn("rabbitmq reconnecting, caused by", "error", err)
+ j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
j.conn, dialErr = amqp.Dial(j.connStr)
if dialErr != nil {
return fmt.Errorf("fail to dial server endpoint: %v", dialErr)
}
- j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+ j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
errInit := j.initRabbitMQ()
if errInit != nil {
- j.logger.Error("error while redialing", "error", errInit)
+ j.log.Error("error while redialing", "error", errInit)
return errInit
}
@@ -69,18 +74,32 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
// restart listener
j.listener(deliv)
- j.logger.Info("queues and subscribers redeclare succeed")
+ j.log.Info("queues and subscribers redeclare succeed")
return nil
}
retryErr := backoff.Retry(op, expb)
if retryErr != nil {
j.Unlock()
- j.logger.Error("backoff failed", "error", retryErr)
+ j.log.Error("backoff failed", "error", retryErr)
return
}
j.Unlock()
+
+ case <-j.stopCh:
+ err := j.publishChan.Close()
+ if err != nil {
+ j.log.Error("publish channel close", "error", err)
+ }
+ err = j.consumeChan.Close()
+ if err != nil {
+ j.log.Error("consume channel close", "error", err)
+ }
+ err = j.conn.Close()
+ if err != nil {
+ j.log.Error("amqp connection close", "error", err)
+ }
}
}
}()
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 8f6f4b5f..9d79221c 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -22,14 +22,17 @@ type JobBroker struct {
queues sync.Map
pq priorityqueue.Queue
localQueue chan *Item
+
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) {
const op = errors.Op("new_ephemeral_pipeline")
jb := &JobBroker{
- log: log,
- pq: q,
+ log: log,
+ pq: q,
+ stopCh: make(chan struct{}, 1),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -50,6 +53,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q
return jb, nil
}
+func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) {
+ panic("not implemented")
+}
+
func (j *JobBroker) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
@@ -82,8 +89,13 @@ func (j *JobBroker) Push(job *structs.Job) error {
func (j *JobBroker) consume() {
// redirect
- for item := range j.localQueue {
- j.pq.Insert(item)
+ for {
+ select {
+ case item := <-j.localQueue:
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
}
}
@@ -98,11 +110,6 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
return nil
}
-// Consume is no-op for the ephemeral
-func (j *JobBroker) Consume(_ *pipeline.Pipeline) error {
- return nil
-}
-
func (j *JobBroker) Pause(pipeline string) {
if q, ok := j.queues.Load(pipeline); ok {
if q == true {
@@ -132,3 +139,20 @@ func (j *JobBroker) List() []string {
return out
}
+
+// Run is no-op for the ephemeral
+func (j *JobBroker) Run(_ *pipeline.Pipeline) error {
+ return nil
+}
+
+func (j *JobBroker) Stop() error {
+ j.queues.Range(func(key, _ interface{}) bool {
+ j.queues.Delete(key)
+ return true
+ })
+
+ // return from the consumer
+ j.stopCh <- struct{}{}
+
+ return nil
+}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 9910d857..75012873 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -4,6 +4,7 @@ import (
"github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -28,6 +29,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
-func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(configKey, p.log, p.cfg, q)
+func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(configKey, p.log, p.cfg, pq)
+}
+
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipeline, pq)
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index b7e41710..6dd55782 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -52,6 +52,8 @@ type Plugin struct {
// initial set of the pipelines to consume
consume map[string]struct{}
+
+ stopCh chan struct{}
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -72,6 +74,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.jobConstructors = make(map[string]jobs.Constructor)
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
+ p.stopCh = make(chan struct{}, 1)
// initial set of pipelines
for i := range p.cfg.Pipelines {
@@ -145,9 +148,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return false
}
- // if pipeline initialized to be consumed, call Consume on it
+ // if pipeline initialized to be consumed, call Run on it
if _, ok := p.consume[name]; ok {
- err = initializedDriver.Consume(pipe)
+ err = initializedDriver.Run(pipe)
if err != nil {
errCh <- errors.E(op, err)
return false
@@ -171,40 +174,46 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for i := uint8(0); i < p.cfg.NumPollers; i++ {
go func() {
for {
- // get data JOB from the queue
- job := p.queue.GetMax()
-
- ctx, err := job.Context()
- if err != nil {
- errNack := job.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
+ select {
+ case <-p.stopCh:
+ p.log.Debug("------> job poller stopped <------")
+ return
+ default:
+ // get data JOB from the queue
+ job := p.queue.ExtractMin()
+
+ ctx, err := job.Context()
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+ p.log.Error("job marshal context", "error", err)
}
- p.log.Error("job marshal context", "error", err)
- }
-
- exec := payload.Payload{
- Context: ctx,
- Body: job.Body(),
- }
- _, err = p.workersPool.Exec(exec)
- if err != nil {
- errNack := job.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
+ exec := payload.Payload{
+ Context: ctx,
+ Body: job.Body(),
}
- p.log.Error("job execute", "error", err)
- continue
- }
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+
+ p.log.Error("job execute", "error", err)
+ continue
+ }
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
- errAck := job.Ack()
- if errAck != nil {
- p.log.Error("acknowledge failed", "error", errAck)
+ errAck := job.Ack()
+ if errAck != nil {
+ p.log.Error("acknowledge failed", "error", errAck)
+ }
}
}
}()
@@ -215,6 +224,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
+ for k, v := range p.consumers {
+ err := v.Stop()
+ if err != nil {
+ p.log.Error("stop job driver", "driver", k)
+ continue
+ }
+ }
+
+ // this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
+ // but if not, this is not a problem at all.
+ // The main target is to stop the drivers
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ // stop jobs plugin pollers
+ p.stopCh <- struct{}{}
+ }
+ }()
+
+ // just wait pollers for 2 seconds before exit
+ time.Sleep(time.Second * 5)
+
return nil
}
@@ -335,6 +365,66 @@ func (p *Plugin) Resume(pipelines []string) {
}
}
+// Declare a pipeline.
+func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("jobs_plugin_declare")
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipeline.Driver()
+ if dr == "" {
+ return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name()))
+ }
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // init the driver from pipeline
+ initializedDriver, err := c.FromPipeline(pipeline, p.queue)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[pipeline.Name()] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(pipeline)
+ if err != nil {
+ return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ if _, ok := p.consume[pipeline.Name()]; ok {
+ err = initializedDriver.Run(pipeline)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ p.pipelines.Store(pipeline.Name(), pipeline)
+
+ return nil
+}
+
+// Destroy pipeline and release all associated resources.
+func (p *Plugin) Destroy(pp string) error {
+ const op = errors.Op("jobs_plugin_destroy")
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ return d.Stop()
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{
log: p.log,