diff options
Diffstat (limited to 'plugins/factory/factory.go')
-rw-r--r-- | plugins/factory/factory.go | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go new file mode 100644 index 00000000..74fd241f --- /dev/null +++ b/plugins/factory/factory.go @@ -0,0 +1,67 @@ +package factory + +import ( + "context" + "github.com/temporalio/roadrunner-temporal/events" + + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type WorkerFactory interface { + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) +} + +type WFactory struct { + spw Spawner + eb *events.EventBroadcaster +} + +func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + factory, err := wf.spw.NewFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, cmd, factory, opt) + if err != nil { + return nil, err + } + + // TODO event to stop + go func() { + for e := range p.Events() { + wf.eb.Push(e) + } + }() + + return p, nil +} + +func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + + wb, err := roadrunner.InitBaseWorker(cmd()) + if err != nil { + return nil, err + } + + return wb, nil +} + +func (wf *WFactory) Init(app Spawner) error { + wf.spw = app + wf.eb = events.NewEventBroadcaster() + return nil +} + +func (wf *WFactory) AddListener(l events.EventListener) { + wf.eb.AddListener(l) +} |