summaryrefslogtreecommitdiff
path: root/plugins/temporal/client/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/client/plugin.go')
-rw-r--r--plugins/temporal/client/plugin.go34
1 files changed, 24 insertions, 10 deletions
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
index 37c8d7be..047a1815 100644
--- a/plugins/temporal/client/plugin.go
+++ b/plugins/temporal/client/plugin.go
@@ -24,7 +24,7 @@ var stickyCacheSet = false
// Plugin implement Temporal contract.
type Plugin struct {
workerID int32
- cfg Config
+ cfg *Config
dc converter.DataConverter
log logger.Logger
client client.Client
@@ -32,7 +32,7 @@ type Plugin struct {
// Temporal define common interface for RoadRunner plugins.
type Temporal interface {
- GetClient() (client.Client, error)
+ GetClient() client.Client
GetDataConverter() converter.DataConverter
GetConfig() Config
GetCodec() rrt.Codec
@@ -51,15 +51,27 @@ type Config struct {
// Init initiates temporal client plugin.
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("temporal_client_plugin_init")
p.log = log
p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ if p.cfg == nil {
+ return errors.E(op, errors.Disabled)
+ }
- return cfg.UnmarshalKey(PluginName, &p.cfg)
+ return nil
}
// GetConfig returns temporal configuration.
func (p *Plugin) GetConfig() Config {
- return p.cfg
+ if p.cfg != nil {
+ return *p.cfg
+ }
+ // empty
+ return Config{}
}
// GetCodec returns communication codec.
@@ -79,6 +91,7 @@ func (p *Plugin) GetDataConverter() converter.DataConverter {
// Serve starts temporal srv.
func (p *Plugin) Serve() chan error {
+ const op = errors.Op("temporal_client_plugin_serve")
errCh := make(chan error, 1)
var err error
@@ -94,12 +107,12 @@ func (p *Plugin) Serve() chan error {
DataConverter: p.dc,
})
- p.log.Debug("Connected to temporal server", "Plugin", p.cfg.Address)
-
if err != nil {
- errCh <- errors.E(errors.Op("p connect"), err)
+ errCh <- errors.E(op, err)
}
+ p.log.Debug("connected to temporal server", "address", p.cfg.Address)
+
return errCh
}
@@ -113,14 +126,15 @@ func (p *Plugin) Stop() error {
}
// GetClient returns active srv connection.
-func (p *Plugin) GetClient() (client.Client, error) {
- return p.client, nil
+func (p *Plugin) GetClient() client.Client {
+ return p.client
}
// CreateWorker allocates new temporal worker on an active connection.
func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
+ const op = errors.Op("temporal_client_plugin_create_worker")
if p.client == nil {
- return nil, errors.E("unable to create worker, invalid temporal p")
+ return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
}
if options.Identity == "" {