diff options
Diffstat (limited to 'plugins/temporal/client')
-rw-r--r-- | plugins/temporal/client/doc/doc.go | 1 | ||||
-rw-r--r-- | plugins/temporal/client/doc/temporal.drawio | 1 | ||||
-rw-r--r-- | plugins/temporal/client/plugin.go | 169 |
3 files changed, 171 insertions, 0 deletions
diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go new file mode 100644 index 00000000..10257070 --- /dev/null +++ b/plugins/temporal/client/doc/doc.go @@ -0,0 +1 @@ +package doc diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio new file mode 100644 index 00000000..f2350af8 --- /dev/null +++ b/plugins/temporal/client/doc/temporal.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go new file mode 100644 index 00000000..047a1815 --- /dev/null +++ b/plugins/temporal/client/plugin.go @@ -0,0 +1,169 @@ +package client + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// PluginName defines public service name. +const PluginName = "temporal" + +// indicates that the case size was set +var stickyCacheSet = false + +// Plugin implement Temporal contract. +type Plugin struct { + workerID int32 + cfg *Config + dc converter.DataConverter + log logger.Logger + client client.Client +} + +// Temporal define common interface for RoadRunner plugins. +type Temporal interface { + GetClient() client.Client + GetDataConverter() converter.DataConverter + GetConfig() Config + GetCodec() rrt.Codec + CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error) +} + +// Config of the temporal client and depended services. +type Config struct { + Address string + Namespace string + Activities *pool.Config + Codec string + DebugLevel int `mapstructure:"debug_level"` + CacheSize int `mapstructure:"cache_size"` +} + +// Init initiates temporal client plugin. +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("temporal_client_plugin_init") + p.log = log + p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + if p.cfg == nil { + return errors.E(op, errors.Disabled) + } + + return nil +} + +// GetConfig returns temporal configuration. +func (p *Plugin) GetConfig() Config { + if p.cfg != nil { + return *p.cfg + } + // empty + return Config{} +} + +// GetCodec returns communication codec. +func (p *Plugin) GetCodec() rrt.Codec { + if p.cfg.Codec == "json" { + return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log) + } + + // production ready protocol, no debug abilities + return rrt.NewProtoCodec() +} + +// GetDataConverter returns data active data converter. +func (p *Plugin) GetDataConverter() converter.DataConverter { + return p.dc +} + +// Serve starts temporal srv. +func (p *Plugin) Serve() chan error { + const op = errors.Op("temporal_client_plugin_serve") + errCh := make(chan error, 1) + var err error + + if stickyCacheSet == false && p.cfg.CacheSize != 0 { + worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize) + stickyCacheSet = true + } + + p.client, err = client.NewClient(client.Options{ + Logger: p.log, + HostPort: p.cfg.Address, + Namespace: p.cfg.Namespace, + DataConverter: p.dc, + }) + + if err != nil { + errCh <- errors.E(op, err) + } + + p.log.Debug("connected to temporal server", "address", p.cfg.Address) + + return errCh +} + +// Stop stops temporal srv connection. +func (p *Plugin) Stop() error { + if p.client != nil { + p.client.Close() + } + + return nil +} + +// GetClient returns active srv connection. +func (p *Plugin) GetClient() client.Client { + return p.client +} + +// CreateWorker allocates new temporal worker on an active connection. +func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { + const op = errors.Op("temporal_client_plugin_create_worker") + if p.client == nil { + return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client")) + } + + if options.Identity == "" { + if tq == "" { + tq = client.DefaultNamespace + } + + // ensures unique worker IDs + options.Identity = fmt.Sprintf( + "%d@%s@%s@%v", + os.Getpid(), + getHostName(), + tq, + atomic.AddInt32(&p.workerID, 1), + ) + } + + return worker.New(p.client, tq, options), nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +func getHostName() string { + hostName, err := os.Hostname() + if err != nil { + hostName = "Unknown" + } + return hostName +} |