diff options
author | Valery Piashchynski <[email protected]> | 2021-02-02 00:56:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-02 00:56:49 +0300 |
commit | 6d48d31ac7fb6b7a9170f2f253e204521f244c9e (patch) | |
tree | ac64346997b307808d8a6624dcd1a5a7b2e887f1 | |
parent | 7149a8ee1c0935bb5c8d13312ba66b78b9c4d174 (diff) |
Update RPC plugin, use hashmap instead of slice to store pluggable
plugins
Fix issue with log channels
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | plugins/logger/config.go | 12 | ||||
-rw-r--r-- | plugins/logger/plugin.go | 2 | ||||
-rw-r--r-- | plugins/rpc/plugin.go | 31 | ||||
-rw-r--r-- | tests/worker.php | 34 |
5 files changed, 58 insertions, 23 deletions
@@ -31,4 +31,4 @@ require ( golang.org/x/net v0.0.0-20201224014010-6772e930b67b golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 -) +)
\ No newline at end of file diff --git a/plugins/logger/config.go b/plugins/logger/config.go index 8cc88d02..63e2aa41 100644 --- a/plugins/logger/config.go +++ b/plugins/logger/config.go @@ -10,26 +10,26 @@ import ( // ChannelConfig configures loggers per channel. type ChannelConfig struct { // Dedicated channels per logger. By default logger allocated via named logger. - Channels map[string]Config `json:"channels" mapstructure:"channels"` + Channels map[string]Config `mapstructure:"channels" mapstructure:"channels"` } type Config struct { // Mode configures logger based on some default template (development, production, off). - Mode string `json:"mode" mapstructure:"mode"` + Mode string `mapstructure:"mode" mapstructure:"mode"` // Level is the minimum enabled logging level. Note that this is a dynamic // level, so calling ChannelConfig.Level.SetLevel will atomically change the log // level of all loggers descended from this config. - Level string `json:"level" mapstructure:"level"` + Level string `mapstructure:"level" mapstructure:"level"` // Encoding sets the logger's encoding. Valid values are "json" and // "console", as well as any third-party encodings registered via // RegisterEncoder. - Encoding string `json:"encoding" mapstructure:"encoding"` + Encoding string `mapstructure:"encoding" mapstructure:"encoding"` // Output is a list of URLs or file paths to write logging output to. // See Open for details. - Output []string `json:"output" mapstructure:"output"` + Output []string `mapstructure:"output" mapstructure:"output"` // ErrorOutput is a list of URLs to write internal logger errors to. // The default is standard error. @@ -37,7 +37,7 @@ type Config struct { // Note that this setting only affects internal errors; for sample code that // sends error-level logs to a different location from info- and debug-level // logs, see the package-level AdvancedConfiguration example. - ErrorOutput []string `json:"errorOutput" mapstructure:"errorOutput"` + ErrorOutput []string `mapstructure:"errorOutput" mapstructure:"errorOutput"` } // ZapConfig converts config into Zap configuration. diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go index 141ede64..7fc464b6 100644 --- a/plugins/logger/plugin.go +++ b/plugins/logger/plugin.go @@ -53,7 +53,7 @@ func (z *ZapLogger) NamedLogger(name string) (Logger, error) { if err != nil { return nil, err } - return NewZapAdapter(l), nil + return NewZapAdapter(l.Named(name)), nil } return NewZapAdapter(z.base.Named(name)), nil diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index e13768f0..94fec0b6 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -15,18 +15,13 @@ import ( // PluginName contains default plugin name. const PluginName = "RPC" -type pluggable struct { - service RPCer - name string -} - // Plugin is RPC service. type Plugin struct { cfg Config log logger.Logger rpc *rpc.Server // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC - plugins []pluggable + plugins map[string]RPCer listener net.Listener closed *uint32 } @@ -42,14 +37,23 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, errors.Disabled, err) } + // Init defaults s.cfg.InitDefaults() - + // Init pluggable plugins map + s.plugins = make(map[string]RPCer) + // init logs s.log = log + // set up state state := uint32(0) s.closed = &state atomic.StoreUint32(s.closed, 0) - return s.cfg.Valid() + // validate config + err = s.cfg.Valid() + if err != nil { + return errors.E(op, err) + } + return nil } // Serve serves the service. @@ -62,14 +66,14 @@ func (s *Plugin) Serve() chan error { services := make([]string, 0, len(s.plugins)) // Attach all services - for i := 0; i < len(s.plugins); i++ { - err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC()) + for name := range s.plugins { + err := s.Register(name, s.plugins[name].RPC()) if err != nil { errCh <- errors.E(op, err) return errCh } - services = append(services, s.plugins[i].name) + services = append(services, name) } var err error @@ -128,10 +132,7 @@ func (s *Plugin) Collects() []interface{} { // RegisterPlugin registers RPC service plugin. func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) { - s.plugins = append(s.plugins, pluggable{ - service: p, - name: name.Name(), - }) + s.plugins[name.Name()] = p } // Register publishes in the server the set of methods of the diff --git a/tests/worker.php b/tests/worker.php new file mode 100644 index 00000000..5c9c80e6 --- /dev/null +++ b/tests/worker.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +require __DIR__ . '/vendor/autoload.php'; + +/** + * @param string $dir + * @return array<string> + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { + $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { + $class = 'Temporal\\Tests\\Activity\\' . $name; + $worker->registerActivityImplementations(new $class); +} + +$factory->run(); |