summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-02 00:56:49 +0300
committerValery Piashchynski <[email protected]>2021-02-02 00:56:49 +0300
commit6d48d31ac7fb6b7a9170f2f253e204521f244c9e (patch)
treeac64346997b307808d8a6624dcd1a5a7b2e887f1
parent7149a8ee1c0935bb5c8d13312ba66b78b9c4d174 (diff)
Update RPC plugin, use hashmap instead of slice to store pluggable
plugins Fix issue with log channels
-rw-r--r--go.mod2
-rw-r--r--plugins/logger/config.go12
-rw-r--r--plugins/logger/plugin.go2
-rw-r--r--plugins/rpc/plugin.go31
-rw-r--r--tests/worker.php34
5 files changed, 58 insertions, 23 deletions
diff --git a/go.mod b/go.mod
index d77872d7..85884fc9 100644
--- a/go.mod
+++ b/go.mod
@@ -31,4 +31,4 @@ require (
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
-)
+) \ No newline at end of file
diff --git a/plugins/logger/config.go b/plugins/logger/config.go
index 8cc88d02..63e2aa41 100644
--- a/plugins/logger/config.go
+++ b/plugins/logger/config.go
@@ -10,26 +10,26 @@ import (
// ChannelConfig configures loggers per channel.
type ChannelConfig struct {
// Dedicated channels per logger. By default logger allocated via named logger.
- Channels map[string]Config `json:"channels" mapstructure:"channels"`
+ Channels map[string]Config `mapstructure:"channels" mapstructure:"channels"`
}
type Config struct {
// Mode configures logger based on some default template (development, production, off).
- Mode string `json:"mode" mapstructure:"mode"`
+ Mode string `mapstructure:"mode" mapstructure:"mode"`
// Level is the minimum enabled logging level. Note that this is a dynamic
// level, so calling ChannelConfig.Level.SetLevel will atomically change the log
// level of all loggers descended from this config.
- Level string `json:"level" mapstructure:"level"`
+ Level string `mapstructure:"level" mapstructure:"level"`
// Encoding sets the logger's encoding. Valid values are "json" and
// "console", as well as any third-party encodings registered via
// RegisterEncoder.
- Encoding string `json:"encoding" mapstructure:"encoding"`
+ Encoding string `mapstructure:"encoding" mapstructure:"encoding"`
// Output is a list of URLs or file paths to write logging output to.
// See Open for details.
- Output []string `json:"output" mapstructure:"output"`
+ Output []string `mapstructure:"output" mapstructure:"output"`
// ErrorOutput is a list of URLs to write internal logger errors to.
// The default is standard error.
@@ -37,7 +37,7 @@ type Config struct {
// Note that this setting only affects internal errors; for sample code that
// sends error-level logs to a different location from info- and debug-level
// logs, see the package-level AdvancedConfiguration example.
- ErrorOutput []string `json:"errorOutput" mapstructure:"errorOutput"`
+ ErrorOutput []string `mapstructure:"errorOutput" mapstructure:"errorOutput"`
}
// ZapConfig converts config into Zap configuration.
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go
index 141ede64..7fc464b6 100644
--- a/plugins/logger/plugin.go
+++ b/plugins/logger/plugin.go
@@ -53,7 +53,7 @@ func (z *ZapLogger) NamedLogger(name string) (Logger, error) {
if err != nil {
return nil, err
}
- return NewZapAdapter(l), nil
+ return NewZapAdapter(l.Named(name)), nil
}
return NewZapAdapter(z.base.Named(name)), nil
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index e13768f0..94fec0b6 100644
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -15,18 +15,13 @@ import (
// PluginName contains default plugin name.
const PluginName = "RPC"
-type pluggable struct {
- service RPCer
- name string
-}
-
// Plugin is RPC service.
type Plugin struct {
cfg Config
log logger.Logger
rpc *rpc.Server
// set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
- plugins []pluggable
+ plugins map[string]RPCer
listener net.Listener
closed *uint32
}
@@ -42,14 +37,23 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, errors.Disabled, err)
}
+ // Init defaults
s.cfg.InitDefaults()
-
+ // Init pluggable plugins map
+ s.plugins = make(map[string]RPCer)
+ // init logs
s.log = log
+ // set up state
state := uint32(0)
s.closed = &state
atomic.StoreUint32(s.closed, 0)
- return s.cfg.Valid()
+ // validate config
+ err = s.cfg.Valid()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
// Serve serves the service.
@@ -62,14 +66,14 @@ func (s *Plugin) Serve() chan error {
services := make([]string, 0, len(s.plugins))
// Attach all services
- for i := 0; i < len(s.plugins); i++ {
- err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC())
+ for name := range s.plugins {
+ err := s.Register(name, s.plugins[name].RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
}
- services = append(services, s.plugins[i].name)
+ services = append(services, name)
}
var err error
@@ -128,10 +132,7 @@ func (s *Plugin) Collects() []interface{} {
// RegisterPlugin registers RPC service plugin.
func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
- s.plugins = append(s.plugins, pluggable{
- service: p,
- name: name.Name(),
- })
+ s.plugins[name.Name()] = p
}
// Register publishes in the server the set of methods of the
diff --git a/tests/worker.php b/tests/worker.php
new file mode 100644
index 00000000..5c9c80e6
--- /dev/null
+++ b/tests/worker.php
@@ -0,0 +1,34 @@
+<?php
+
+declare(strict_types=1);
+
+require __DIR__ . '/vendor/autoload.php';
+
+/**
+ * @param string $dir
+ * @return array<string>
+ */
+$getClasses = static function (string $dir): iterable {
+ $files = glob($dir . '/*.php');
+
+ foreach ($files as $file) {
+ yield substr(basename($file), 0, -4);
+ }
+};
+
+$factory = \Temporal\WorkerFactory::create();
+
+$worker = $factory->newWorker('default');
+
+// register all workflows
+foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) {
+ $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
+}
+
+// register all activity
+foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
+ $class = 'Temporal\\Tests\\Activity\\' . $name;
+ $worker->registerActivityImplementations(new $class);
+}
+
+$factory->run();