summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/amqp/config.go22
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go30
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go10
-rw-r--r--plugins/jobs/config.go6
-rw-r--r--plugins/jobs/interface.go5
-rw-r--r--plugins/jobs/plugin.go41
-rw-r--r--plugins/server/plugin.go2
8 files changed, 82 insertions, 35 deletions
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
deleted file mode 100644
index a60cb486..00000000
--- a/plugins/jobs/brokers/amqp/config.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package amqp
-
-import "time"
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// TimeoutDuration returns number of seconds allowed to redial
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
deleted file mode 100644
index 0e8d02ac..00000000
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package amqp
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
new file mode 100644
index 00000000..905f5409
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -0,0 +1,30 @@
+package ephemeral
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+type JobBroker struct {
+}
+
+func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
+ return &JobBroker{}, nil
+}
+
+func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) {
+ panic("implement me")
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+ panic("implement me")
+}
+
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index df0d31be..84cc871b 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,6 +1,10 @@
package ephemeral
-import "github.com/spiral/roadrunner/v2/plugins/logger"
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
const (
PluginName string = "ephemeral"
@@ -18,3 +22,7 @@ func (p *Plugin) Init(log logger.Logger) error {
func (p *Plugin) Name() string {
return PluginName
}
+
+func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(q)
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 74e4a811..bb042ec9 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -36,10 +36,12 @@ func (c *Config) InitDefaults() error {
return errors.E(op, err)
}
- if c.poolCfg != nil {
- c.poolCfg.InitDefaults()
+ if c.poolCfg == nil {
+ c.poolCfg = &poolImpl.Config{}
}
+ c.poolCfg.InitDefaults()
+
return nil
}
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index b4862038..bb0e8c50 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -1,6 +1,7 @@
package jobs
import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
@@ -12,3 +13,7 @@ type Consumer interface {
Consume(*pipeline.Pipeline)
Register(*pipeline.Pipeline)
}
+
+type Broker interface {
+ InitJobBroker(queue priorityqueue.Queue) (Consumer, error)
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index e7466efb..90932edd 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -25,9 +26,15 @@ type Plugin struct {
log logger.Logger
workersPool pool.Pool
+ server server.Server
+ brokers map[string]Broker
consumers map[string]Consumer
- events events.Handler
+
+ events events.Handler
+
+ // priority queue implementation
+ queue priorityqueue.Queue
}
func testListener(data interface{}) {
@@ -50,21 +57,39 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
- if err != nil {
- return errors.E(op, err)
- }
-
+ p.server = server
p.events = events.NewEventsHandler()
p.events.AddListener(testListener)
+ p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
+
+ // initialize priority queue
+ p.queue = priorityqueue.NewPriorityQueue()
p.log = log
+
return nil
}
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ for name := range p.brokers {
+ jb, err := p.brokers[name].InitJobBroker(p.queue)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ p.consumers[name] = jb
+ }
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
// initialize sub-plugins
// provide a queue to them
// start consume loop
@@ -83,8 +108,8 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) {
- p.consumers[name.Name()] = c
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
+ p.brokers[name.Name()] = c
}
func (p *Plugin) Available() {}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 42273ed7..038d83d4 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -148,7 +148,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env E
return nil, errors.E(op, err)
}
- list := make([]events.Listener, 0, 22)
+ list := make([]events.Listener, 0, 2)
list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
if len(listeners) != 0 {
list = append(list, listeners...)