diff options
author | Valery Piashchynski <[email protected]> | 2021-06-22 17:33:55 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-22 17:33:55 +0300 |
commit | 035e432af9a059e9e5187bd03f2e7864ed94c054 (patch) | |
tree | 16383fdb9ee7c635e14cd1898ec573f331ba8d30 /plugins/jobs/plugin.go | |
parent | 5627146e45afbb8f6566862c60a42a0b0aad2d0a (diff) |
- Folders struct
- Initial ephemeral broker commit
- Initial RPC
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index bd5ff5bf..072f872a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -6,8 +6,10 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -25,6 +27,7 @@ type Plugin struct { workersPool pool.Pool consumers map[string]Consumer + events events.Handler } func testListener(data interface{}) { @@ -47,6 +50,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) p.consumers = make(map[string]Consumer) p.log = log return nil @@ -78,6 +83,31 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + func (p *Plugin) RPC() interface{} { return &rpc{log: p.log} } |