diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go | 22 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 30 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 10 | ||||
-rw-r--r-- | plugins/jobs/config.go | 6 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 5 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 41 | ||||
-rw-r--r-- | plugins/server/plugin.go | 2 |
8 files changed, 82 insertions, 35 deletions
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go deleted file mode 100644 index a60cb486..00000000 --- a/plugins/jobs/brokers/amqp/config.go +++ /dev/null @@ -1,22 +0,0 @@ -package amqp - -import "time" - -// Config defines sqs broker configuration. -type Config struct { - // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). - Addr string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// TimeoutDuration returns number of seconds allowed to redial -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go deleted file mode 100644 index 0e8d02ac..00000000 --- a/plugins/jobs/brokers/amqp/plugin.go +++ /dev/null @@ -1 +0,0 @@ -package amqp diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go new file mode 100644 index 00000000..905f5409 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -0,0 +1,30 @@ +package ephemeral + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +type JobBroker struct { +} + +func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { + return &JobBroker{}, nil +} + +func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) { + panic("implement me") +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { + panic("implement me") +} + +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) { + panic("implement me") +} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index df0d31be..84cc871b 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,6 +1,10 @@ package ephemeral -import "github.com/spiral/roadrunner/v2/plugins/logger" +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) const ( PluginName string = "ephemeral" @@ -18,3 +22,7 @@ func (p *Plugin) Init(log logger.Logger) error { func (p *Plugin) Name() string { return PluginName } + +func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(q) +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 74e4a811..bb042ec9 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -36,10 +36,12 @@ func (c *Config) InitDefaults() error { return errors.E(op, err) } - if c.poolCfg != nil { - c.poolCfg.InitDefaults() + if c.poolCfg == nil { + c.poolCfg = &poolImpl.Config{} } + c.poolCfg.InitDefaults() + return nil } diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index b4862038..bb0e8c50 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -1,6 +1,7 @@ package jobs import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) @@ -12,3 +13,7 @@ type Consumer interface { Consume(*pipeline.Pipeline) Register(*pipeline.Pipeline) } + +type Broker interface { + InitJobBroker(queue priorityqueue.Queue) (Consumer, error) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index e7466efb..90932edd 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -25,9 +26,15 @@ type Plugin struct { log logger.Logger workersPool pool.Pool + server server.Server + brokers map[string]Broker consumers map[string]Consumer - events events.Handler + + events events.Handler + + // priority queue implementation + queue priorityqueue.Queue } func testListener(data interface{}) { @@ -50,21 +57,39 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) - if err != nil { - return errors.E(op, err) - } - + p.server = server p.events = events.NewEventsHandler() p.events.AddListener(testListener) + p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) + + // initialize priority queue + p.queue = priorityqueue.NewPriorityQueue() p.log = log + return nil } func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + for name := range p.brokers { + jb, err := p.brokers[name].InitJobBroker(p.queue) + if err != nil { + errCh <- err + return errCh + } + + p.consumers[name] = jb + } + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + if err != nil { + errCh <- err + return errCh + } + // initialize sub-plugins // provide a queue to them // start consume loop @@ -83,8 +108,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) { - p.consumers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { + p.brokers[name.Name()] = c } func (p *Plugin) Available() {} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 42273ed7..038d83d4 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -148,7 +148,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env E return nil, errors.E(op, err) } - list := make([]events.Listener, 0, 22) + list := make([]events.Listener, 0, 2) list = append(list, server.collectPoolEvents, server.collectWorkerEvents) if len(listeners) != 0 { list = append(list, listeners...) |