summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-23 17:41:51 +0300
committerValery Piashchynski <[email protected]>2021-06-23 17:41:51 +0300
commit521aeb823bc8fa1f0a91b540cbbac96328185f51 (patch)
tree9ec5f3fba0a4aa0469dd106040142c5e6b8cf144 /plugins/jobs/brokers
parent7fc09959619e9e400ecafcffcd63e38812f397a6 (diff)
- Add PQ (priority_queue) mock
- Add binary heap mock - Connect first sub-plugin (ephemeral) with root jobs plugin Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers')
-rw-r--r--plugins/jobs/brokers/amqp/config.go22
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go30
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go10
4 files changed, 39 insertions, 24 deletions
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
deleted file mode 100644
index a60cb486..00000000
--- a/plugins/jobs/brokers/amqp/config.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package amqp
-
-import "time"
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// TimeoutDuration returns number of seconds allowed to redial
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
deleted file mode 100644
index 0e8d02ac..00000000
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package amqp
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
new file mode 100644
index 00000000..905f5409
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -0,0 +1,30 @@
+package ephemeral
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+type JobBroker struct {
+}
+
+func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
+ return &JobBroker{}, nil
+}
+
+func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) {
+ panic("implement me")
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+ panic("implement me")
+}
+
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index df0d31be..84cc871b 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,6 +1,10 @@
package ephemeral
-import "github.com/spiral/roadrunner/v2/plugins/logger"
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
const (
PluginName string = "ephemeral"
@@ -18,3 +22,7 @@ func (p *Plugin) Init(log logger.Logger) error {
func (p *Plugin) Name() string {
return PluginName
}
+
+func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(q)
+}