diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index bd5ff5bf..072f872a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -6,8 +6,10 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -25,6 +27,7 @@ type Plugin struct { workersPool pool.Pool consumers map[string]Consumer + events events.Handler } func testListener(data interface{}) { @@ -47,6 +50,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) p.consumers = make(map[string]Consumer) p.log = log return nil @@ -78,6 +83,31 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + func (p *Plugin) RPC() interface{} { return &rpc{log: p.log} } |