summaryrefslogtreecommitdiff
path: root/plugins/temporal
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal')
-rw-r--r--plugins/temporal/activity/activity_pool.go197
-rw-r--r--plugins/temporal/activity/plugin.go215
-rw-r--r--plugins/temporal/activity/rpc.go66
-rw-r--r--plugins/temporal/client/doc/doc.go1
-rw-r--r--plugins/temporal/client/doc/temporal.drawio1
-rw-r--r--plugins/temporal/client/plugin.go169
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go225
-rw-r--r--plugins/temporal/protocol/message.go334
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
-rw-r--r--plugins/temporal/workflow/canceller.go41
-rw-r--r--plugins/temporal/workflow/canceller_test.go33
-rw-r--r--plugins/temporal/workflow/id_registry.go51
-rw-r--r--plugins/temporal/workflow/message_queue.go47
-rw-r--r--plugins/temporal/workflow/message_queue_test.go53
-rw-r--r--plugins/temporal/workflow/plugin.go203
-rw-r--r--plugins/temporal/workflow/process.go436
-rw-r--r--plugins/temporal/workflow/workflow_pool.go190
22 files changed, 2822 insertions, 0 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
new file mode 100644
index 00000000..d09722ce
--- /dev/null
+++ b/plugins/temporal/activity/activity_pool.go
@@ -0,0 +1,197 @@
+package activity
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/worker"
+)
+
+// RR_MODE env variable
+const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
+// RR_CODEC env variable
+const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
+
+//
+const doNotCompleteOnReturn = "doNotCompleteOnReturn"
+
+type activityPool interface {
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.SyncWorker
+ ActivityNames() []string
+ GetActivityContext(taskToken []byte) (context.Context, error)
+}
+
+type activityPoolImpl struct {
+ dc converter.DataConverter
+ codec rrt.Codec
+ seqID uint64
+ activities []string
+ wp pool.Pool
+ tWorkers []worker.Worker
+ running sync.Map
+}
+
+// newActivityPool
+func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
+ const op = errors.Op("new_activity_pool")
+ // env variables
+ env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
+ wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &activityPoolImpl{
+ codec: codec,
+ wp: wp,
+ running: sync.Map{},
+ }, nil
+}
+
+// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_start")
+ pool.dc = temporal.GetDataConverter()
+
+ err := pool.initWorkers(ctx, temporal)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(pool.tWorkers); i++ {
+ err := pool.tWorkers[i].Start()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) Destroy(ctx context.Context) error {
+ for i := 0; i < len(pool.tWorkers); i++ {
+ pool.tWorkers[i].Stop()
+ }
+
+ pool.wp.Destroy(ctx)
+ return nil
+}
+
+// Workers returns list of all allocated workers.
+func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker {
+ return pool.wp.Workers()
+}
+
+// ActivityNames returns list of all available activity names.
+func (pool *activityPoolImpl) ActivityNames() []string {
+ return pool.activities
+}
+
+// ActivityNames returns list of all available activity names.
+func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
+ const op = errors.Op("activity_pool_get_activity_context")
+ c, ok := pool.running.Load(string(taskToken))
+ if !ok {
+ return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
+ }
+
+ return c.(context.Context), nil
+}
+
+// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
+func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_create_temporal_worker")
+
+ workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ pool.activities = make([]string, 0)
+ pool.tWorkers = make([]worker.Worker, 0)
+
+ for i := 0; i < len(workerInfo); i++ {
+ w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
+ if err != nil {
+ return errors.E(op, err, pool.Destroy(ctx))
+ }
+
+ pool.tWorkers = append(pool.tWorkers, w)
+ for j := 0; j < len(workerInfo[i].Activities); j++ {
+ w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
+ Name: workerInfo[i].Activities[j].Name,
+ DisableAlreadyRegisteredCheck: false,
+ })
+
+ pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
+ }
+ }
+
+ return nil
+}
+
+// executes activity with underlying worker.
+func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
+ const op = errors.Op("activity_pool_execute_activity")
+
+ heartbeatDetails := &common.Payloads{}
+ if activity.HasHeartbeatDetails(ctx) {
+ err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ }
+
+ var info = activity.GetInfo(ctx)
+ var msg = rrt.Message{
+ ID: atomic.AddUint64(&pool.seqID, 1),
+ Command: rrt.InvokeActivity{
+ Name: info.ActivityType.Name,
+ Info: info,
+ HeartbeatDetails: len(heartbeatDetails.Payloads),
+ },
+ Payloads: args,
+ }
+
+ if len(heartbeatDetails.Payloads) != 0 {
+ msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
+ }
+
+ pool.running.Store(string(info.TaskToken), ctx)
+ defer pool.running.Delete(string(info.TaskToken))
+
+ result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return nil, errors.E(op, errors.Str("invalid activity worker response"))
+ }
+
+ out := result[0]
+ if out.Failure != nil {
+ if out.Failure.Message == doNotCompleteOnReturn {
+ return nil, activity.ErrResultPending
+ }
+
+ return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc)
+ }
+
+ return out.Payloads, nil
+}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
new file mode 100644
index 00000000..5e562a8d
--- /dev/null
+++ b/plugins/temporal/activity/plugin.go
@@ -0,0 +1,215 @@
+package activity
+
+import (
+ "context"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+)
+
+const (
+ // PluginName defines public service name.
+ PluginName = "activities"
+
+ // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
+ RRMode = "temporal/activity"
+)
+
+// Plugin to manage activity execution.
+type Plugin struct {
+ temporal client.Temporal
+ events events.Handler
+ server server.Server
+ log logger.Logger
+ mu sync.Mutex
+ reset chan struct{}
+ pool activityPool
+ closing int64
+}
+
+// Init configures activity service.
+func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ const op = errors.Op("activity_plugin_init")
+ if temporal.GetConfig().Activities == nil {
+ // no need to serve activities
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.temporal = temporal
+ p.server = server
+ p.events = events.NewEventsHandler()
+ p.log = log
+ p.reset = make(chan struct{})
+
+ return nil
+}
+
+// Serve activities with underlying workers.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("activity_plugin_serve")
+
+ errCh := make(chan error, 1)
+ pool, err := p.startPool()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ p.pool = pool
+
+ go func() {
+ for {
+ select {
+ case <-p.reset:
+ if atomic.LoadInt64(&p.closing) == 1 {
+ return
+ }
+
+ err := p.replacePool()
+ if err == nil {
+ continue
+ }
+
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.InitialInterval = time.Second
+
+ err = backoff.Retry(p.replacePool, bkoff)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }
+ }
+ }()
+
+ return errCh
+}
+
+// Stop stops the serving plugin.
+func (p *Plugin) Stop() error {
+ atomic.StoreInt64(&p.closing, 1)
+ const op = errors.Op("activity_plugin_stop")
+
+ pool := p.getPool()
+ if pool != nil {
+ p.pool = nil
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// RPC returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, client: p.temporal.GetClient()}
+}
+
+// Workers returns pool workers.
+func (p *Plugin) Workers() []worker.SyncWorker {
+ return p.getPool().Workers()
+}
+
+// ActivityNames returns list of all available activities.
+func (p *Plugin) ActivityNames() []string {
+ return p.pool.ActivityNames()
+}
+
+// Reset resets underlying workflow pool with new copy.
+func (p *Plugin) Reset() error {
+ p.reset <- struct{}{}
+
+ return nil
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) AddListener(listener events.Listener) {
+ p.events.AddListener(listener)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) poolListener(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventPoolError {
+ p.log.Error("Activity pool error", "error", ev.Payload.(error))
+ p.reset <- struct{}{}
+ }
+ }
+
+ p.events.Push(event)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) startPool() (activityPool, error) {
+ pool, err := newActivityPool(
+ p.temporal.GetCodec().WithLogger(p.log),
+ p.poolListener,
+ *p.temporal.GetConfig().Activities,
+ p.server,
+ )
+
+ if err != nil {
+ return nil, errors.E(errors.Op("newActivityPool"), err)
+ }
+
+ err = pool.Start(context.Background(), p.temporal)
+ if err != nil {
+ return nil, errors.E(errors.Op("startActivityPool"), err)
+ }
+
+ p.log.Debug("Started activity processing", "activities", pool.ActivityNames())
+
+ return pool, nil
+}
+
+func (p *Plugin) replacePool() error {
+ pool, err := p.startPool()
+ if err != nil {
+ p.log.Error("Replace activity pool failed", "error", err)
+ return errors.E(errors.Op("newActivityPool"), err)
+ }
+
+ p.log.Debug("Replace activity pool")
+
+ var previous activityPool
+
+ p.mu.Lock()
+ previous, p.pool = p.pool, pool
+ p.mu.Unlock()
+
+ errD := previous.Destroy(context.Background())
+ if errD != nil {
+ p.log.Error(
+ "Unable to destroy expired activity pool",
+ "error",
+ errors.E(errors.Op("destroyActivityPool"), errD),
+ )
+ }
+
+ return nil
+}
+
+// getPool returns currently pool.
+func (p *Plugin) getPool() activityPool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.pool
+}
diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go
new file mode 100644
index 00000000..49efcd4f
--- /dev/null
+++ b/plugins/temporal/activity/rpc.go
@@ -0,0 +1,66 @@
+package activity
+
+import (
+ v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ "go.temporal.io/sdk/client"
+ "google.golang.org/protobuf/proto"
+)
+
+/*
+- the method's type is exported.
+- the method is exported.
+- the method has two arguments, both exported (or builtin) types.
+- the method's second argument is a pointer.
+- the method has return type error.
+*/
+type rpc struct {
+ srv *Plugin
+ client client.Client
+}
+
+// RecordHeartbeatRequest sent by activity to record current state.
+type RecordHeartbeatRequest struct {
+ TaskToken []byte `json:"taskToken"`
+ Details []byte `json:"details"`
+}
+
+// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled.
+type RecordHeartbeatResponse struct {
+ Canceled bool `json:"canceled"`
+}
+
+// RecordActivityHeartbeat records heartbeat for an activity.
+// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
+// details - is the progress you want to record along with heart beat for this activity.
+// The errors it can return:
+// - EntityNotExistsError
+// - InternalServiceError
+// - CanceledError
+func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error {
+ details := &commonpb.Payloads{}
+
+ if len(in.Details) != 0 {
+ if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
+ return err
+ }
+ }
+
+ // find running activity
+ ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken)
+ if err != nil {
+ return err
+ }
+
+ activity.RecordHeartbeat(ctx, details)
+
+ select {
+ case <-ctx.Done():
+ *out = RecordHeartbeatResponse{Canceled: true}
+ default:
+ *out = RecordHeartbeatResponse{Canceled: false}
+ }
+
+ return nil
+}
diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go
new file mode 100644
index 00000000..10257070
--- /dev/null
+++ b/plugins/temporal/client/doc/doc.go
@@ -0,0 +1 @@
+package doc
diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio
new file mode 100644
index 00000000..f2350af8
--- /dev/null
+++ b/plugins/temporal/client/doc/temporal.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
new file mode 100644
index 00000000..047a1815
--- /dev/null
+++ b/plugins/temporal/client/plugin.go
@@ -0,0 +1,169 @@
+package client
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/sdk/client"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/worker"
+)
+
+// PluginName defines public service name.
+const PluginName = "temporal"
+
+// indicates that the case size was set
+var stickyCacheSet = false
+
+// Plugin implement Temporal contract.
+type Plugin struct {
+ workerID int32
+ cfg *Config
+ dc converter.DataConverter
+ log logger.Logger
+ client client.Client
+}
+
+// Temporal define common interface for RoadRunner plugins.
+type Temporal interface {
+ GetClient() client.Client
+ GetDataConverter() converter.DataConverter
+ GetConfig() Config
+ GetCodec() rrt.Codec
+ CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error)
+}
+
+// Config of the temporal client and depended services.
+type Config struct {
+ Address string
+ Namespace string
+ Activities *pool.Config
+ Codec string
+ DebugLevel int `mapstructure:"debug_level"`
+ CacheSize int `mapstructure:"cache_size"`
+}
+
+// Init initiates temporal client plugin.
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("temporal_client_plugin_init")
+ p.log = log
+ p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ if p.cfg == nil {
+ return errors.E(op, errors.Disabled)
+ }
+
+ return nil
+}
+
+// GetConfig returns temporal configuration.
+func (p *Plugin) GetConfig() Config {
+ if p.cfg != nil {
+ return *p.cfg
+ }
+ // empty
+ return Config{}
+}
+
+// GetCodec returns communication codec.
+func (p *Plugin) GetCodec() rrt.Codec {
+ if p.cfg.Codec == "json" {
+ return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log)
+ }
+
+ // production ready protocol, no debug abilities
+ return rrt.NewProtoCodec()
+}
+
+// GetDataConverter returns data active data converter.
+func (p *Plugin) GetDataConverter() converter.DataConverter {
+ return p.dc
+}
+
+// Serve starts temporal srv.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("temporal_client_plugin_serve")
+ errCh := make(chan error, 1)
+ var err error
+
+ if stickyCacheSet == false && p.cfg.CacheSize != 0 {
+ worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize)
+ stickyCacheSet = true
+ }
+
+ p.client, err = client.NewClient(client.Options{
+ Logger: p.log,
+ HostPort: p.cfg.Address,
+ Namespace: p.cfg.Namespace,
+ DataConverter: p.dc,
+ })
+
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+
+ p.log.Debug("connected to temporal server", "address", p.cfg.Address)
+
+ return errCh
+}
+
+// Stop stops temporal srv connection.
+func (p *Plugin) Stop() error {
+ if p.client != nil {
+ p.client.Close()
+ }
+
+ return nil
+}
+
+// GetClient returns active srv connection.
+func (p *Plugin) GetClient() client.Client {
+ return p.client
+}
+
+// CreateWorker allocates new temporal worker on an active connection.
+func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
+ const op = errors.Op("temporal_client_plugin_create_worker")
+ if p.client == nil {
+ return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
+ }
+
+ if options.Identity == "" {
+ if tq == "" {
+ tq = client.DefaultNamespace
+ }
+
+ // ensures unique worker IDs
+ options.Identity = fmt.Sprintf(
+ "%d@%s@%s@%v",
+ os.Getpid(),
+ getHostName(),
+ tq,
+ atomic.AddInt32(&p.workerID, 1),
+ )
+ }
+
+ return worker.New(p.client, tq, options), nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func getHostName() string {
+ hostName, err := os.Hostname()
+ if err != nil {
+ hostName = "Unknown"
+ }
+ return hostName
+}
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
new file mode 100644
index 00000000..406e70f4
--- /dev/null
+++ b/plugins/temporal/protocol/converter.go
@@ -0,0 +1,76 @@
+package protocol
+
+import (
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+type (
+ // DataConverter wraps Temporal data converter to enable direct access to the payloads.
+ DataConverter struct {
+ fallback converter.DataConverter
+ }
+)
+
+// NewDataConverter creates new data converter.
+func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
+ return &DataConverter{fallback: fallback}
+}
+
+// ToPayloads converts a list of values.
+func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
+ for _, v := range values {
+ if aggregated, ok := v.(*commonpb.Payloads); ok {
+ // bypassing
+ return aggregated, nil
+ }
+ }
+
+ return r.fallback.ToPayloads(values...)
+}
+
+// ToPayload converts single value to payload.
+func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
+ return r.fallback.ToPayload(value)
+}
+
+// FromPayloads converts to a list of values of different types.
+// Useful for deserializing arguments of function invocations.
+func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
+ if payloads == nil {
+ return nil
+ }
+
+ if len(valuePtrs) == 1 {
+ // input proxying
+ if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
+ *input = &commonpb.Payloads{}
+ (*input).Payloads = payloads.Payloads
+ return nil
+ }
+ }
+
+ for i := 0; i < len(payloads.Payloads); i++ {
+ err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// FromPayload converts single value from payload.
+func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
+ return r.fallback.FromPayload(payload, valuePtr)
+}
+
+// ToString converts payload object into human readable string.
+func (r *DataConverter) ToString(input *commonpb.Payload) string {
+ return r.fallback.ToString(input)
+}
+
+// ToStrings converts payloads object into human readable strings.
+func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
+ return r.fallback.ToStrings(input)
+}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
new file mode 100644
index 00000000..6ce9fa0f
--- /dev/null
+++ b/plugins/temporal/protocol/converter_test.go
@@ -0,0 +1,23 @@
+package protocol
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+func Test_Passthough(t *testing.T) {
+ codec := NewDataConverter(converter.GetDefaultDataConverter())
+
+ value, err := codec.ToPayloads("test")
+ assert.NoError(t, err)
+
+ out := &common.Payloads{}
+
+ assert.Len(t, out.Payloads, 0)
+ assert.NoError(t, codec.FromPayloads(value, &out))
+
+ assert.Len(t, out.Payloads, 1)
+}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
new file mode 100644
index 00000000..c554e28f
--- /dev/null
+++ b/plugins/temporal/protocol/internal/protocol.pb.go
@@ -0,0 +1,167 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: protocol.proto
+
+package internal
+
+import (
+ fmt "fmt"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+ v11 "go.temporal.io/api/common/v1"
+ v1 "go.temporal.io/api/failure/v1"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type Frame struct {
+ Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Frame) Reset() { *m = Frame{} }
+func (m *Frame) String() string { return proto.CompactTextString(m) }
+func (*Frame) ProtoMessage() {}
+func (*Frame) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{0}
+}
+
+func (m *Frame) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Frame.Unmarshal(m, b)
+}
+func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
+}
+func (m *Frame) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Frame.Merge(m, src)
+}
+func (m *Frame) XXX_Size() int {
+ return xxx_messageInfo_Frame.Size(m)
+}
+func (m *Frame) XXX_DiscardUnknown() {
+ xxx_messageInfo_Frame.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Frame proto.InternalMessageInfo
+
+func (m *Frame) GetMessages() []*Message {
+ if m != nil {
+ return m.Messages
+ }
+ return nil
+}
+
+// Single communication message.
+type Message struct {
+ Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
+ // command name (if any)
+ Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
+ // command options in json format.
+ Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
+ // error response.
+ Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
+ // invocation or result payloads.
+ Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Message) Reset() { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage() {}
+func (*Message) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{1}
+}
+
+func (m *Message) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Message.Unmarshal(m, b)
+}
+func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Message.Marshal(b, m, deterministic)
+}
+func (m *Message) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Message.Merge(m, src)
+}
+func (m *Message) XXX_Size() int {
+ return xxx_messageInfo_Message.Size(m)
+}
+func (m *Message) XXX_DiscardUnknown() {
+ xxx_messageInfo_Message.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Message proto.InternalMessageInfo
+
+func (m *Message) GetId() uint64 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
+func (m *Message) GetCommand() string {
+ if m != nil {
+ return m.Command
+ }
+ return ""
+}
+
+func (m *Message) GetOptions() []byte {
+ if m != nil {
+ return m.Options
+ }
+ return nil
+}
+
+func (m *Message) GetFailure() *v1.Failure {
+ if m != nil {
+ return m.Failure
+ }
+ return nil
+}
+
+func (m *Message) GetPayloads() *v11.Payloads {
+ if m != nil {
+ return m.Payloads
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
+ proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
+}
+
+func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
+
+var fileDescriptor_2bc2336598a3f7e0 = []byte{
+ // 257 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
+ 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
+ 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
+ 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
+ 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
+ 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
+ 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
+ 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
+ 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
+ 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
+ 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
+ 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
+ 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
+ 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
+ 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
+ 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
+ 0x00,
+}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
new file mode 100644
index 00000000..e7a77068
--- /dev/null
+++ b/plugins/temporal/protocol/json_codec.go
@@ -0,0 +1,225 @@
+package protocol
+
+import (
+ "github.com/fatih/color"
+ j "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+// JSONCodec can be used for debugging and log capturing reasons.
+type JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
+
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+}
+
+// jsonFrame contains message command in binary form.
+type jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
+
+ // Options to be unmarshalled to body (raw payload).
+ Options j.RawMessage `json:"options,omitempty"`
+
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+}
+
+// NewJSONCodec creates new Json communication codec.
+func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: level,
+ logger: logger,
+ }
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: c.level,
+ logger: logger,
+ }
+}
+
+// GetName returns codec name.
+func (c *JSONCodec) GetName() string {
+ return "json"
+}
+
+// Execute exchanges commands with worker.
+func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ const op = errors.Op("json_codec_execute")
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var response = make([]jsonFrame, 0, 5)
+ var result = make([]Message, 0, 5)
+ var err error
+
+ frames := make([]jsonFrame, 0, len(msg))
+ for _, m := range msg {
+ frame, err := c.packFrame(m)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ frames = append(frames, frame)
+ }
+
+ p := payload.Payload{}
+
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = json.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.Body, err = json.Marshal(frames)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(p.Body) + " " + string(p.Context)
+ if c.level >= DebugHumanized {
+ logMessage = color.GreenString(logMessage)
+ }
+
+ c.logger.Debug(logMessage)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(out.Body)
+ if c.level >= DebugHumanized {
+ logMessage = color.HiYellowString(logMessage)
+ }
+
+ c.logger.Debug(logMessage, "receive", true)
+ }
+
+ err = json.Unmarshal(out.Body, &response)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ for _, f := range response {
+ msg, err := c.parseFrame(f)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
+ var (
+ err error
+ frame jsonFrame
+ )
+
+ frame.ID = msg.ID
+
+ if msg.Payloads != nil {
+ frame.Payloads, err = msg.Payloads.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Failure != nil {
+ frame.Failure, err = msg.Failure.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Command == nil {
+ return frame, nil
+ }
+
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ frame.Options, err = json.Marshal(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ return frame, nil
+}
+
+func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
+ var (
+ err error
+ msg Message
+ )
+
+ msg.ID = frame.ID
+
+ if frame.Payloads != nil {
+ msg.Payloads = &common.Payloads{}
+
+ err = msg.Payloads.Unmarshal(frame.Payloads)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Failure != nil {
+ msg.Failure = &failure.Failure{}
+
+ err = msg.Failure.Unmarshal(frame.Failure)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Command != "" {
+ cmd, err := initCommand(frame.Command)
+ if err != nil {
+ return Message{}, err
+ }
+
+ err = json.Unmarshal(frame.Options, &cmd)
+ if err != nil {
+ return Message{}, err
+ }
+
+ msg.Command = cmd
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
new file mode 100644
index 00000000..d5e0f49d
--- /dev/null
+++ b/plugins/temporal/protocol/message.go
@@ -0,0 +1,334 @@
+package protocol
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+const (
+ getWorkerInfoCommand = "GetWorkerInfo"
+
+ invokeActivityCommand = "InvokeActivity"
+ startWorkflowCommand = "StartWorkflow"
+ invokeSignalCommand = "InvokeSignal"
+ invokeQueryCommand = "InvokeQuery"
+ destroyWorkflowCommand = "DestroyWorkflow"
+ cancelWorkflowCommand = "CancelWorkflow"
+ getStackTraceCommand = "StackTrace"
+
+ executeActivityCommand = "ExecuteActivity"
+ executeChildWorkflowCommand = "ExecuteChildWorkflow"
+ getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
+
+ newTimerCommand = "NewTimer"
+ sideEffectCommand = "SideEffect"
+ getVersionCommand = "GetVersion"
+ completeWorkflowCommand = "CompleteWorkflow"
+ continueAsNewCommand = "ContinueAsNew"
+
+ signalExternalWorkflowCommand = "SignalExternalWorkflow"
+ cancelExternalWorkflowCommand = "CancelExternalWorkflow"
+
+ cancelCommand = "Cancel"
+ panicCommand = "Panic"
+)
+
+// GetWorkerInfo reads worker information.
+type GetWorkerInfo struct{}
+
+// InvokeActivity invokes activity.
+type InvokeActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+
+ // Info contains execution context.
+ Info activity.Info `json:"info"`
+
+ // HeartbeatDetails indicates that the payload also contains last heartbeat details.
+ HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
+}
+
+// StartWorkflow sends worker command to start workflow.
+type StartWorkflow struct {
+ // Info to define workflow context.
+ Info *workflow.Info `json:"info"`
+
+ // LastCompletion contains offset of last completion results.
+ LastCompletion int `json:"lastCompletion,omitempty"`
+}
+
+// InvokeSignal invokes signal with a set of arguments.
+type InvokeSignal struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+
+ // Name of the signal.
+ Name string `json:"name"`
+}
+
+// InvokeQuery invokes query with a set of arguments.
+type InvokeQuery struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+ // Name of the query.
+ Name string `json:"name"`
+}
+
+// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
+type CancelWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// DestroyWorkflow asks worker to offload workflow from memory.
+type DestroyWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// GetStackTrace asks worker to offload workflow from memory.
+type GetStackTrace struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// ExecuteActivity command by workflow worker.
+type ExecuteActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
+}
+
+// ExecuteChildWorkflow executes child workflow.
+type ExecuteChildWorkflow struct {
+ // Name defines workflow name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.WorkflowOptions `json:"options,omitempty"`
+}
+
+// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
+type GetChildWorkflowExecution struct {
+ // ID of child workflow command.
+ ID uint64 `json:"id"`
+}
+
+// NewTimer starts new timer.
+type NewTimer struct {
+ // Milliseconds defines timer duration.
+ Milliseconds int `json:"ms"`
+}
+
+// SideEffect to be recorded into the history.
+type SideEffect struct{}
+
+// GetVersion requests version marker.
+type GetVersion struct {
+ ChangeID string `json:"changeID"`
+ MinSupported int `json:"minSupported"`
+ MaxSupported int `json:"maxSupported"`
+}
+
+// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
+type CompleteWorkflow struct{}
+
+// ContinueAsNew restarts workflow with new running instance.
+type ContinueAsNew struct {
+ // Result defines workflow execution result.
+ Name string `json:"name"`
+
+ // Options for continued as new workflow.
+ Options struct {
+ TaskQueueName string
+ WorkflowExecutionTimeout time.Duration
+ WorkflowRunTimeout time.Duration
+ WorkflowTaskTimeout time.Duration
+ } `json:"options"`
+}
+
+// SignalExternalWorkflow sends signal to external workflow.
+type SignalExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+ Signal string `json:"signal"`
+ ChildWorkflowOnly bool `json:"childWorkflowOnly"`
+}
+
+// CancelExternalWorkflow canceller external workflow.
+type CancelExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+}
+
+// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
+type Cancel struct {
+ // CommandIDs to be cancelled.
+ CommandIDs []uint64 `json:"ids"`
+}
+
+// Panic triggers panic in workflow process.
+type Panic struct {
+ // Message to include into the error.
+ Message string `json:"message"`
+}
+
+// ActivityParams maps activity command to activity params.
+func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
+ params := bindings.ExecuteActivityParams{
+ ExecuteActivityOptions: cmd.Options,
+ ActivityType: bindings.ActivityType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// WorkflowParams maps workflow command to workflow params.
+func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
+ params := bindings.ExecuteWorkflowParams{
+ WorkflowOptions: cmd.Options,
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// ToDuration converts timer command to time.Duration.
+func (cmd NewTimer) ToDuration() time.Duration {
+ return time.Millisecond * time.Duration(cmd.Milliseconds)
+}
+
+// returns command name (only for the commands sent to the worker)
+func commandName(cmd interface{}) (string, error) {
+ const op = errors.Op("command_name")
+ switch cmd.(type) {
+ case GetWorkerInfo, *GetWorkerInfo:
+ return getWorkerInfoCommand, nil
+ case StartWorkflow, *StartWorkflow:
+ return startWorkflowCommand, nil
+ case InvokeSignal, *InvokeSignal:
+ return invokeSignalCommand, nil
+ case InvokeQuery, *InvokeQuery:
+ return invokeQueryCommand, nil
+ case DestroyWorkflow, *DestroyWorkflow:
+ return destroyWorkflowCommand, nil
+ case CancelWorkflow, *CancelWorkflow:
+ return cancelWorkflowCommand, nil
+ case GetStackTrace, *GetStackTrace:
+ return getStackTraceCommand, nil
+ case InvokeActivity, *InvokeActivity:
+ return invokeActivityCommand, nil
+ case ExecuteActivity, *ExecuteActivity:
+ return executeActivityCommand, nil
+ case ExecuteChildWorkflow, *ExecuteChildWorkflow:
+ return executeChildWorkflowCommand, nil
+ case GetChildWorkflowExecution, *GetChildWorkflowExecution:
+ return getChildWorkflowExecutionCommand, nil
+ case NewTimer, *NewTimer:
+ return newTimerCommand, nil
+ case GetVersion, *GetVersion:
+ return getVersionCommand, nil
+ case SideEffect, *SideEffect:
+ return sideEffectCommand, nil
+ case CompleteWorkflow, *CompleteWorkflow:
+ return completeWorkflowCommand, nil
+ case ContinueAsNew, *ContinueAsNew:
+ return continueAsNewCommand, nil
+ case SignalExternalWorkflow, *SignalExternalWorkflow:
+ return signalExternalWorkflowCommand, nil
+ case CancelExternalWorkflow, *CancelExternalWorkflow:
+ return cancelExternalWorkflowCommand, nil
+ case Cancel, *Cancel:
+ return cancelCommand, nil
+ case Panic, *Panic:
+ return panicCommand, nil
+ default:
+ return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
+ }
+}
+
+// reads command from binary payload
+func initCommand(name string) (interface{}, error) {
+ const op = errors.Op("init_command")
+ switch name {
+ case getWorkerInfoCommand:
+ return &GetWorkerInfo{}, nil
+
+ case startWorkflowCommand:
+ return &StartWorkflow{}, nil
+
+ case invokeSignalCommand:
+ return &InvokeSignal{}, nil
+
+ case invokeQueryCommand:
+ return &InvokeQuery{}, nil
+
+ case destroyWorkflowCommand:
+ return &DestroyWorkflow{}, nil
+
+ case cancelWorkflowCommand:
+ return &CancelWorkflow{}, nil
+
+ case getStackTraceCommand:
+ return &GetStackTrace{}, nil
+
+ case invokeActivityCommand:
+ return &InvokeActivity{}, nil
+
+ case executeActivityCommand:
+ return &ExecuteActivity{}, nil
+
+ case executeChildWorkflowCommand:
+ return &ExecuteChildWorkflow{}, nil
+
+ case getChildWorkflowExecutionCommand:
+ return &GetChildWorkflowExecution{}, nil
+
+ case newTimerCommand:
+ return &NewTimer{}, nil
+
+ case getVersionCommand:
+ return &GetVersion{}, nil
+
+ case sideEffectCommand:
+ return &SideEffect{}, nil
+
+ case completeWorkflowCommand:
+ return &CompleteWorkflow{}, nil
+
+ case continueAsNewCommand:
+ return &ContinueAsNew{}, nil
+
+ case signalExternalWorkflowCommand:
+ return &SignalExternalWorkflow{}, nil
+
+ case cancelExternalWorkflowCommand:
+ return &CancelExternalWorkflow{}, nil
+
+ case cancelCommand:
+ return &Cancel{}, nil
+
+ case panicCommand:
+ return &Panic{}, nil
+
+ default:
+ return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
+ }
+}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
new file mode 100644
index 00000000..607fe0fe
--- /dev/null
+++ b/plugins/temporal/protocol/proto_codec.go
@@ -0,0 +1,145 @@
+package protocol
+
+import (
+ v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
+ jsoniter "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
+ "google.golang.org/protobuf/proto"
+)
+
+type (
+ // ProtoCodec uses protobuf to exchange messages with underlying workers.
+ ProtoCodec struct {
+ }
+)
+
+// NewProtoCodec creates new Proto communication codec.
+func NewProtoCodec() Codec {
+ return &ProtoCodec{}
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
+ return &ProtoCodec{}
+}
+
+// GetName returns codec name.
+func (c *ProtoCodec) GetName() string {
+ return "protobuf"
+}
+
+// Execute exchanges commands with worker.
+func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var request = &internal.Frame{}
+ var response = &internal.Frame{}
+ var result = make([]Message, 0, 5)
+ var err error
+
+ for _, m := range msg {
+ frame, err := c.packMessage(m)
+ if err != nil {
+ return nil, err
+ }
+
+ request.Messages = append(request.Messages, frame)
+ }
+
+ p := payload.Payload{}
+
+ // context is always in json format
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = jsoniter.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(errors.Op("encodeContext"), err)
+ }
+
+ p.Body, err = proto.Marshal(v1.MessageV2(request))
+ if err != nil {
+ return nil, errors.E(errors.Op("encodePayload"), err)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(errors.Op("execute"), err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ err = proto.Unmarshal(out.Body, v1.MessageV2(response))
+ if err != nil {
+ return nil, errors.E(errors.Op("parseResponse"), err)
+ }
+
+ for _, f := range response.Messages {
+ msg, err := c.parseMessage(f)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
+ var err error
+
+ frame := &internal.Message{
+ Id: msg.ID,
+ Payloads: msg.Payloads,
+ Failure: msg.Failure,
+ }
+
+ if msg.Command != nil {
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+
+ frame.Options, err = jsoniter.Marshal(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return frame, nil
+}
+
+func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
+ const op = errors.Op("proto_codec_parse_message")
+ var err error
+
+ msg := Message{
+ ID: frame.Id,
+ Payloads: frame.Payloads,
+ Failure: frame.Failure,
+ }
+
+ if frame.Command != "" {
+ msg.Command, err = initCommand(frame.Command)
+ if err != nil {
+ return Message{}, errors.E(op, err)
+ }
+
+ err = jsoniter.Unmarshal(frame.Options, &msg.Command)
+ if err != nil {
+ return Message{}, errors.E(op, err)
+ }
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
new file mode 100644
index 00000000..53076fdf
--- /dev/null
+++ b/plugins/temporal/protocol/protocol.go
@@ -0,0 +1,77 @@
+package protocol
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+const (
+ // DebugNone disables all debug messages.
+ DebugNone = iota
+
+ // DebugNormal renders all messages into console.
+ DebugNormal
+
+ // DebugHumanized enables color highlights for messages.
+ DebugHumanized
+)
+
+// Context provides worker information about currently. Context can be empty for server level commands.
+type Context struct {
+ // TaskQueue associates message batch with the specific task queue in underlying worker.
+ TaskQueue string `json:"taskQueue,omitempty"`
+
+ // TickTime associated current or historical time with message batch.
+ TickTime string `json:"tickTime,omitempty"`
+
+ // Replay indicates that current message batch is historical.
+ Replay bool `json:"replay,omitempty"`
+}
+
+// Message used to exchange the send commands and receive responses from underlying workers.
+type Message struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command of the message in unmarshalled form. Pointer.
+ Command interface{} `json:"command,omitempty"`
+
+ // Failure associated with command id.
+ Failure *failure.Failure `json:"failure,omitempty"`
+
+ // Payloads contains message specific payloads in binary format.
+ Payloads *commonpb.Payloads `json:"payloads,omitempty"`
+}
+
+// Codec manages payload encoding and decoding while communication with underlying worker.
+type Codec interface {
+ // WithLogger creates new codes instance with attached logger.
+ WithLogger(logger.Logger) Codec
+
+ // GetName returns codec name.
+ GetName() string
+
+ // Execute sends message to worker and waits for the response.
+ Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
+}
+
+// Endpoint provides the ability to send and receive messages.
+type Endpoint interface {
+ // ExecWithContext allow to set ExecTTL
+ Exec(p payload.Payload) (payload.Payload, error)
+}
+
+// DebugLevel configures debug level.
+type DebugLevel int
+
+// IsEmpty only check if task queue set.
+func (ctx Context) IsEmpty() bool {
+ return ctx.TaskQueue == ""
+}
+
+// IsCommand returns true if message carries request.
+func (msg Message) IsCommand() bool {
+ return msg.Command != nil
+}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
new file mode 100644
index 00000000..58a0ae66
--- /dev/null
+++ b/plugins/temporal/protocol/worker_info.go
@@ -0,0 +1,72 @@
+package protocol
+
+import (
+ "github.com/spiral/errors"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/worker"
+)
+
+// WorkerInfo outlines information about every available worker and it's TaskQueues.
+
+// WorkerInfo lists available task queues, workflows and activities.
+type WorkerInfo struct {
+ // TaskQueue assigned to the worker.
+ TaskQueue string `json:"taskQueue"`
+
+ // Options describe worker options.
+ Options worker.Options `json:"options,omitempty"`
+
+ // Workflows provided by the worker.
+ Workflows []WorkflowInfo
+
+ // Activities provided by the worker.
+ Activities []ActivityInfo
+}
+
+// WorkflowInfo describes single worker workflow.
+type WorkflowInfo struct {
+ // Name of the workflow.
+ Name string `json:"name"`
+
+ // Queries pre-defined for the workflow type.
+ Queries []string `json:"queries"`
+
+ // Signals pre-defined for the workflow type.
+ Signals []string `json:"signals"`
+}
+
+// ActivityInfo describes single worker activity.
+type ActivityInfo struct {
+ // Name describes public activity name.
+ Name string `json:"name"`
+}
+
+// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
+func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
+ const op = errors.Op("fetch_worker_info")
+
+ result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return nil, errors.E(op, errors.Str("unable to read worker info"))
+ }
+
+ if result[0].ID != 0 {
+ return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
+ }
+
+ var info []WorkerInfo
+ for i := range result[0].Payloads.Payloads {
+ wi := WorkerInfo{}
+ if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ info = append(info, wi)
+ }
+
+ return info, nil
+}
diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go
new file mode 100644
index 00000000..962c527f
--- /dev/null
+++ b/plugins/temporal/workflow/canceller.go
@@ -0,0 +1,41 @@
+package workflow
+
+import (
+ "sync"
+)
+
+type cancellable func() error
+
+type canceller struct {
+ ids sync.Map
+}
+
+func (c *canceller) register(id uint64, cancel cancellable) {
+ c.ids.Store(id, cancel)
+}
+
+func (c *canceller) discard(id uint64) {
+ c.ids.Delete(id)
+}
+
+func (c *canceller) cancel(ids ...uint64) error {
+ var err error
+ for _, id := range ids {
+ cancel, ok := c.ids.Load(id)
+ if ok == false {
+ continue
+ }
+
+ // TODO return when minimum supported version will be go 1.15
+ // go1.14 don't have LoadAndDelete method
+ // It was introduced only in go1.15
+ c.ids.Delete(id)
+
+ err = cancel.(cancellable)()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go
new file mode 100644
index 00000000..d6e846f8
--- /dev/null
+++ b/plugins/temporal/workflow/canceller_test.go
@@ -0,0 +1,33 @@
+package workflow
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_CancellerNoListeners(t *testing.T) {
+ c := &canceller{}
+
+ assert.NoError(t, c.cancel(1))
+}
+
+func Test_CancellerListenerError(t *testing.T) {
+ c := &canceller{}
+ c.register(1, func() error {
+ return errors.New("failed")
+ })
+
+ assert.Error(t, c.cancel(1))
+}
+
+func Test_CancellerListenerDiscarded(t *testing.T) {
+ c := &canceller{}
+ c.register(1, func() error {
+ return errors.New("failed")
+ })
+
+ c.discard(1)
+ assert.NoError(t, c.cancel(1))
+}
diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go
new file mode 100644
index 00000000..ac75cbda
--- /dev/null
+++ b/plugins/temporal/workflow/id_registry.go
@@ -0,0 +1,51 @@
+package workflow
+
+import (
+ "sync"
+
+ bindings "go.temporal.io/sdk/internalbindings"
+)
+
+// used to gain access to child workflow ids after they become available via callback result.
+type idRegistry struct {
+ mu sync.Mutex
+ ids map[uint64]entry
+ listeners map[uint64]listener
+}
+
+type listener func(w bindings.WorkflowExecution, err error)
+
+type entry struct {
+ w bindings.WorkflowExecution
+ err error
+}
+
+func newIDRegistry() *idRegistry {
+ return &idRegistry{
+ ids: map[uint64]entry{},
+ listeners: map[uint64]listener{},
+ }
+}
+
+func (c *idRegistry) listen(id uint64, cl listener) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.listeners[id] = cl
+
+ if e, ok := c.ids[id]; ok {
+ cl(e.w, e.err)
+ }
+}
+
+func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ e := entry{w: w, err: err}
+ c.ids[id] = e
+
+ if l, ok := c.listeners[id]; ok {
+ l(e.w, e.err)
+ }
+}
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
new file mode 100644
index 00000000..8f4409d1
--- /dev/null
+++ b/plugins/temporal/workflow/message_queue.go
@@ -0,0 +1,47 @@
+package workflow
+
+import (
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+type messageQueue struct {
+ seqID func() uint64
+ queue []rrt.Message
+}
+
+func newMessageQueue(sedID func() uint64) *messageQueue {
+ return &messageQueue{
+ seqID: sedID,
+ queue: make([]rrt.Message, 0, 5),
+ }
+}
+
+func (mq *messageQueue) flush() {
+ mq.queue = mq.queue[0:0]
+}
+
+func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
+ msg := rrt.Message{
+ ID: mq.seqID(),
+ Command: cmd,
+ Payloads: payloads,
+ }
+
+ return msg.ID, msg
+}
+
+func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
+ id, msg := mq.allocateMessage(cmd, payloads)
+ mq.queue = append(mq.queue, msg)
+ return id
+}
+
+func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads})
+}
+
+func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure})
+}
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
new file mode 100644
index 00000000..1fcd409f
--- /dev/null
+++ b/plugins/temporal/workflow/message_queue_test.go
@@ -0,0 +1,53 @@
+package workflow
+
+import (
+ "sync/atomic"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+func Test_MessageQueueFlushError(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ mq.pushError(1, &failure.Failure{})
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+ assert.Equal(t, uint64(0), index)
+}
+
+func Test_MessageQueueFlushResponse(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ mq.pushResponse(1, &common.Payloads{})
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+ assert.Equal(t, uint64(0), index)
+}
+
+func Test_MessageQueueCommandID(t *testing.T) {
+ var index uint64
+ mq := newMessageQueue(func() uint64 {
+ return atomic.AddUint64(&index, 1)
+ })
+
+ n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
+ assert.Equal(t, n, index)
+ assert.Len(t, mq.queue, 1)
+
+ mq.flush()
+ assert.Len(t, mq.queue, 0)
+}
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
new file mode 100644
index 00000000..572d9a3b
--- /dev/null
+++ b/plugins/temporal/workflow/plugin.go
@@ -0,0 +1,203 @@
+package workflow
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+)
+
+const (
+ // PluginName defines public service name.
+ PluginName = "workflows"
+
+ // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
+ RRMode = "temporal/workflow"
+)
+
+// Plugin manages workflows and workers.
+type Plugin struct {
+ temporal client.Temporal
+ events events.Handler
+ server server.Server
+ log logger.Logger
+ mu sync.Mutex
+ reset chan struct{}
+ pool workflowPool
+ closing int64
+}
+
+// Init workflow plugin.
+func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ p.temporal = temporal
+ p.server = server
+ p.events = events.NewEventsHandler()
+ p.log = log
+ p.reset = make(chan struct{}, 1)
+
+ return nil
+}
+
+// Serve starts workflow service.
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("workflow_plugin_serve")
+ errCh := make(chan error, 1)
+
+ pool, err := p.startPool()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ p.pool = pool
+
+ go func() {
+ for {
+ select {
+ case <-p.reset:
+ if atomic.LoadInt64(&p.closing) == 1 {
+ return
+ }
+
+ err := p.replacePool()
+ if err == nil {
+ continue
+ }
+
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.InitialInterval = time.Second
+
+ err = backoff.Retry(p.replacePool, bkoff)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }
+ }
+ }()
+
+ return errCh
+}
+
+// Stop workflow service.
+func (p *Plugin) Stop() error {
+ const op = errors.Op("workflow_plugin_stop")
+ atomic.StoreInt64(&p.closing, 1)
+
+ pool := p.getPool()
+ if pool != nil {
+ p.pool = nil
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// Name of the service.
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Workers returns list of available workflow workers.
+func (p *Plugin) Workers() []worker.BaseProcess {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.pool.Workers()
+}
+
+// WorkflowNames returns list of all available workflows.
+func (p *Plugin) WorkflowNames() []string {
+ return p.pool.WorkflowNames()
+}
+
+// Reset resets underlying workflow pool with new copy.
+func (p *Plugin) Reset() error {
+ p.reset <- struct{}{}
+
+ return nil
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) poolListener(event interface{}) {
+ if ev, ok := event.(PoolEvent); ok {
+ if ev.Event == eventWorkerExit {
+ if ev.Caused != nil {
+ p.log.Error("Workflow pool error", "error", ev.Caused)
+ }
+ p.reset <- struct{}{}
+ }
+ }
+
+ p.events.Push(event)
+}
+
+// AddListener adds event listeners to the service.
+func (p *Plugin) startPool() (workflowPool, error) {
+ const op = errors.Op("workflow_plugin_start_pool")
+ pool, err := newWorkflowPool(
+ p.temporal.GetCodec().WithLogger(p.log),
+ p.poolListener,
+ p.server,
+ )
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = pool.Start(context.Background(), p.temporal)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
+
+ return pool, nil
+}
+
+func (p *Plugin) replacePool() error {
+ p.mu.Lock()
+ const op = errors.Op("workflow_plugin_replace_pool")
+ defer p.mu.Unlock()
+
+ if p.pool != nil {
+ err := p.pool.Destroy(context.Background())
+ p.pool = nil
+ if err != nil {
+ p.log.Error(
+ "Unable to destroy expired workflow pool",
+ "error",
+ errors.E(op, err),
+ )
+ return errors.E(op, err)
+ }
+ }
+
+ pool, err := p.startPool()
+ if err != nil {
+ p.log.Error("Replace workflow pool failed", "error", err)
+ return errors.E(op, err)
+ }
+
+ p.pool = pool
+ p.log.Debug("workflow pool successfully replaced")
+
+ return nil
+}
+
+// getPool returns currently pool.
+func (p *Plugin) getPool() workflowPool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.pool
+}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
new file mode 100644
index 00000000..45e6885c
--- /dev/null
+++ b/plugins/temporal/workflow/process.go
@@ -0,0 +1,436 @@
+package workflow
+
+import (
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ commonpb "go.temporal.io/api/common/v1"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+// wraps single workflow process
+type workflowProcess struct {
+ codec rrt.Codec
+ pool workflowPool
+ env bindings.WorkflowEnvironment
+ header *commonpb.Header
+ mq *messageQueue
+ ids *idRegistry
+ seqID uint64
+ runID string
+ pipeline []rrt.Message
+ callbacks []func() error
+ canceller *canceller
+ inLoop bool
+}
+
+// Execute workflow, bootstraps process.
+func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
+ wf.env = env
+ wf.header = header
+ wf.seqID = 0
+ wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
+ wf.canceller = &canceller{}
+
+ // sequenceID shared for all worker workflows
+ wf.mq = newMessageQueue(wf.pool.SeqID)
+ wf.ids = newIDRegistry()
+
+ env.RegisterCancelHandler(wf.handleCancel)
+ env.RegisterSignalHandler(wf.handleSignal)
+ env.RegisterQueryHandler(wf.handleQuery)
+
+ var (
+ lastCompletion = bindings.GetLastCompletionResult(env)
+ lastCompletionOffset = 0
+ )
+
+ if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
+ if input == nil {
+ input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
+ }
+
+ input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
+ lastCompletionOffset = len(lastCompletion.Payloads)
+ }
+
+ _ = wf.mq.pushCommand(
+ rrt.StartWorkflow{
+ Info: env.WorkflowInfo(),
+ LastCompletion: lastCompletionOffset,
+ },
+ input,
+ )
+}
+
+// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
+func (wf *workflowProcess) OnWorkflowTaskStarted() {
+ wf.inLoop = true
+ defer func() { wf.inLoop = false }()
+
+ var err error
+ for _, callback := range wf.callbacks {
+ err = callback()
+ if err != nil {
+ panic(err)
+ }
+ }
+ wf.callbacks = nil
+
+ if err := wf.flushQueue(); err != nil {
+ panic(err)
+ }
+
+ for len(wf.pipeline) > 0 {
+ msg := wf.pipeline[0]
+ wf.pipeline = wf.pipeline[1:]
+
+ if msg.IsCommand() {
+ err = wf.handleMessage(msg)
+ }
+
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+// StackTrace renders workflow stack trace.
+func (wf *workflowProcess) StackTrace() string {
+ result, err := wf.runCommand(
+ rrt.GetStackTrace{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ },
+ nil,
+ )
+
+ if err != nil {
+ return err.Error()
+ }
+
+ var stacktrace string
+ err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
+ if err != nil {
+ return err.Error()
+ }
+
+ return stacktrace
+}
+
+// Close the workflow.
+func (wf *workflowProcess) Close() {
+ // TODO: properly handle errors
+ // panic(err)
+
+ _ = wf.mq.pushCommand(
+ rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+
+ _, _ = wf.discardQueue()
+}
+
+// execution context.
+func (wf *workflowProcess) getContext() rrt.Context {
+ return rrt.Context{
+ TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
+ TickTime: wf.env.Now().Format(time.RFC3339),
+ Replay: wf.env.IsReplaying(),
+ }
+}
+
+// schedule cancel command
+func (wf *workflowProcess) handleCancel() {
+ _ = wf.mq.pushCommand(
+ rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+}
+
+// schedule the signal processing
+func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
+ _ = wf.mq.pushCommand(
+ rrt.InvokeSignal{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ Name: name,
+ },
+ input,
+ )
+}
+
+// Handle query in blocking mode.
+func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
+ result, err := wf.runCommand(
+ rrt.InvokeQuery{
+ RunID: wf.runID,
+ Name: queryType,
+ },
+ queryArgs,
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ if result.Failure != nil {
+ return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
+ }
+
+ return result.Payloads, nil
+}
+
+// process incoming command
+func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
+ const op = errors.Op("handleMessage")
+ var err error
+
+ var (
+ id = msg.ID
+ cmd = msg.Command
+ payloads = msg.Payloads
+ )
+
+ switch cmd := cmd.(type) {
+ case *rrt.ExecuteActivity:
+ params := cmd.ActivityParams(wf.env, payloads)
+ activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelActivity(activityID)
+ return nil
+ })
+
+ case *rrt.ExecuteChildWorkflow:
+ params := cmd.WorkflowParams(wf.env, payloads)
+
+ // always use deterministic id
+ if params.WorkflowID == "" {
+ nextID := atomic.AddUint64(&wf.seqID, 1)
+ params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
+ }
+
+ wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
+ wf.ids.push(id, r, e)
+ })
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
+ return nil
+ })
+
+ case *rrt.GetChildWorkflowExecution:
+ wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
+ cl := wf.createCallback(id)
+
+ // TODO rewrite
+ if err != nil {
+ panic(err)
+ }
+
+ p, err := wf.env.GetDataConverter().ToPayloads(w)
+ if err != nil {
+ panic(err)
+ }
+
+ cl(p, err)
+ })
+
+ case *rrt.NewTimer:
+ timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
+ wf.canceller.register(id, func() error {
+ if timerID != nil {
+ wf.env.RequestCancelTimer(*timerID)
+ }
+ return nil
+ })
+
+ case *rrt.GetVersion:
+ version := wf.env.GetVersion(
+ cmd.ChangeID,
+ workflow.Version(cmd.MinSupported),
+ workflow.Version(cmd.MaxSupported),
+ )
+
+ result, err := wf.env.GetDataConverter().ToPayloads(version)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.SideEffect:
+ wf.env.SideEffect(
+ func() (*commonpb.Payloads, error) {
+ return payloads, nil
+ },
+ wf.createContinuableCallback(id),
+ )
+
+ case *rrt.CompleteWorkflow:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ if msg.Failure == nil {
+ wf.env.Complete(payloads, nil)
+ } else {
+ wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
+ }
+
+ case *rrt.ContinueAsNew:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ wf.env.Complete(nil, &workflow.ContinueAsNewError{
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ Header: wf.header,
+ TaskQueueName: cmd.Options.TaskQueueName,
+ WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
+ WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
+ WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
+ })
+
+ case *rrt.SignalExternalWorkflow:
+ wf.env.SignalExternalWorkflow(
+ cmd.Namespace,
+ cmd.WorkflowID,
+ cmd.RunID,
+ cmd.Signal,
+ payloads,
+ nil,
+ cmd.ChildWorkflowOnly,
+ wf.createCallback(id),
+ )
+
+ case *rrt.CancelExternalWorkflow:
+ wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
+
+ case *rrt.Cancel:
+ err = wf.canceller.cancel(cmd.CommandIDs...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.Panic:
+ panic(errors.E(cmd.Message))
+
+ default:
+ panic("undefined command")
+ }
+
+ return nil
+}
+
+func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) error {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return nil
+ }
+
+ // fetch original payload
+ wf.mq.pushResponse(id, result)
+ return nil
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ // timer cancel callback can happen inside the loop
+ if wf.inLoop {
+ err := callback(result, err)
+ if err != nil {
+ panic(err)
+ }
+
+ return
+ }
+
+ wf.callbacks = append(wf.callbacks, func() error {
+ return callback(result, err)
+ })
+ }
+}
+
+// callback to be called inside the queue processing, adds new messages at the end of the queue
+func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ callback(result, err)
+ }
+}
+
+// Exchange messages between host and worker processes and add new commands to the queue.
+func (wf *workflowProcess) flushQueue() error {
+ const op = errors.Op("flush queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.pipeline = append(wf.pipeline, messages...)
+
+ return nil
+}
+
+// Exchange messages between host and worker processes without adding new commands to the queue.
+func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
+ const op = errors.Op("discard queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return messages, nil
+}
+
+// Run single command and return single result.
+func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
+
+ result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
+ if err != nil {
+ return rrt.Message{}, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
+ }
+
+ return result[0], nil
+}
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
new file mode 100644
index 00000000..b9ed46c8
--- /dev/null
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -0,0 +1,190 @@
+package workflow
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/client"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/worker"
+ "go.temporal.io/sdk/workflow"
+)
+
+const eventWorkerExit = 8390
+
+// RR_MODE env variable key
+const RR_MODE = "RR_MODE" //nolint
+
+// RR_CODEC env variable key
+const RR_CODEC = "RR_CODEC" //nolint
+
+type workflowPool interface {
+ SeqID() uint64
+ Exec(p payload.Payload) (payload.Payload, error)
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.BaseProcess
+ WorkflowNames() []string
+}
+
+// PoolEvent triggered on workflow pool worker events.
+type PoolEvent struct {
+ Event int
+ Context interface{}
+ Caused error
+}
+
+// workflowPoolImpl manages workflowProcess executions between worker restarts.
+type workflowPoolImpl struct {
+ codec rrt.Codec
+ seqID uint64
+ workflows map[string]rrt.WorkflowInfo
+ tWorkers []worker.Worker
+ mu sync.Mutex
+ worker rrWorker.SyncWorker
+ active bool
+}
+
+// newWorkflowPool creates new workflow pool.
+func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
+ const op = errors.Op("new_workflow_pool")
+ w, err := factory.NewWorker(
+ context.Background(),
+ map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()},
+ listener,
+ )
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ go func() {
+ err := w.Wait()
+ listener(PoolEvent{Event: eventWorkerExit, Caused: err})
+ }()
+
+ return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil
+}
+
+// Start the pool in non blocking mode.
+func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_start")
+ pool.mu.Lock()
+ pool.active = true
+ pool.mu.Unlock()
+
+ err := pool.initWorkers(ctx, temporal)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(pool.tWorkers); i++ {
+ err := pool.tWorkers[i].Start()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// Active.
+func (pool *workflowPoolImpl) Active() bool {
+ return pool.active
+}
+
+// Destroy stops all temporal workers and application worker.
+func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ const op = errors.Op("workflow_pool_destroy")
+
+ pool.active = false
+ for i := 0; i < len(pool.tWorkers); i++ {
+ pool.tWorkers[i].Stop()
+ }
+
+ worker.PurgeStickyWorkflowCache()
+
+ err := pool.worker.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+// NewWorkflowDefinition initiates new workflow process.
+func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition {
+ return &workflowProcess{
+ codec: pool.codec,
+ pool: pool,
+ }
+}
+
+// NewWorkflowDefinition initiates new workflow process.
+func (pool *workflowPoolImpl) SeqID() uint64 {
+ return atomic.AddUint64(&pool.seqID, 1)
+}
+
+// Exec set of commands in thread safe move.
+func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ if !pool.active {
+ return payload.Payload{}, nil
+ }
+
+ return pool.worker.Exec(p)
+}
+
+func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess {
+ return []rrWorker.BaseProcess{pool.worker}
+}
+
+func (pool *workflowPoolImpl) WorkflowNames() []string {
+ names := make([]string, 0, len(pool.workflows))
+ for name := range pool.workflows {
+ names = append(names, name)
+ }
+
+ return names
+}
+
+// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
+func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_init_workers")
+ workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ pool.workflows = make(map[string]rrt.WorkflowInfo)
+ pool.tWorkers = make([]worker.Worker, 0)
+
+ for _, info := range workerInfo {
+ w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
+ if err != nil {
+ return errors.E(op, err, pool.Destroy(ctx))
+ }
+
+ pool.tWorkers = append(pool.tWorkers, w)
+ for _, workflowInfo := range info.Workflows {
+ w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{
+ Name: workflowInfo.Name,
+ DisableAlreadyRegisteredCheck: false,
+ })
+
+ pool.workflows[workflowInfo.Name] = workflowInfo
+ }
+ }
+
+ return nil
+}