summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go30
1 files changed, 30 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index bd5ff5bf..072f872a 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -6,8 +6,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -25,6 +27,7 @@ type Plugin struct {
workersPool pool.Pool
consumers map[string]Consumer
+ events events.Handler
}
func testListener(data interface{}) {
@@ -47,6 +50,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(testListener)
p.consumers = make(map[string]Consumer)
p.log = log
return nil
@@ -78,6 +83,31 @@ func (p *Plugin) Name() string {
return PluginName
}
+func (p *Plugin) Push(j *structs.Job) (string, error) {
+ pipe, pOpts, err := p.cfg.MatchPipeline(j)
+ if err != nil {
+ panic(err)
+ }
+
+ if pOpts != nil {
+ j.Options.Merge(pOpts)
+ }
+
+ broker, ok := p.consumers[pipe.Broker()]
+ if !ok {
+ panic("broker not found")
+ }
+
+ id, err := broker.Push(pipe, j)
+ if err != nil {
+ panic(err)
+ }
+
+ // p.events.Push()
+
+ return id, nil
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{log: p.log}
}