summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-27 13:56:28 +0300
committerValery Piashchynski <[email protected]>2021-01-27 13:56:28 +0300
commit744c2b0c86b88f77e681f8660bf3a476e83711b8 (patch)
treef7af7d7d494d1f5ca272af1ad0b978fe44d685a9 /plugins
parente2266b80db47444ba5858c736833a8a81b1361ad (diff)
Move temporal plugin to the temporal repository
Diffstat (limited to 'plugins')
-rw-r--r--plugins/temporal/activity/activity_pool.go197
-rw-r--r--plugins/temporal/activity/plugin.go215
-rw-r--r--plugins/temporal/activity/rpc.go66
-rw-r--r--plugins/temporal/client/doc/doc.go1
-rw-r--r--plugins/temporal/client/doc/temporal.drawio1
-rw-r--r--plugins/temporal/client/plugin.go169
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go225
-rw-r--r--plugins/temporal/protocol/message.go334
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
-rw-r--r--plugins/temporal/workflow/canceller.go41
-rw-r--r--plugins/temporal/workflow/canceller_test.go33
-rw-r--r--plugins/temporal/workflow/id_registry.go51
-rw-r--r--plugins/temporal/workflow/message_queue.go47
-rw-r--r--plugins/temporal/workflow/message_queue_test.go53
-rw-r--r--plugins/temporal/workflow/plugin.go203
-rw-r--r--plugins/temporal/workflow/process.go436
-rw-r--r--plugins/temporal/workflow/workflow_pool.go190
22 files changed, 0 insertions, 2822 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
deleted file mode 100644
index d09722ce..00000000
--- a/plugins/temporal/activity/activity_pool.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package activity
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/worker"
-)
-
-// RR_MODE env variable
-const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
-// RR_CODEC env variable
-const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
-
-//
-const doNotCompleteOnReturn = "doNotCompleteOnReturn"
-
-type activityPool interface {
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.SyncWorker
- ActivityNames() []string
- GetActivityContext(taskToken []byte) (context.Context, error)
-}
-
-type activityPoolImpl struct {
- dc converter.DataConverter
- codec rrt.Codec
- seqID uint64
- activities []string
- wp pool.Pool
- tWorkers []worker.Worker
- running sync.Map
-}
-
-// newActivityPool
-func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
- const op = errors.Op("new_activity_pool")
- // env variables
- env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
- wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return &activityPoolImpl{
- codec: codec,
- wp: wp,
- running: sync.Map{},
- }, nil
-}
-
-// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("activity_pool_start")
- pool.dc = temporal.GetDataConverter()
-
- err := pool.initWorkers(ctx, temporal)
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(pool.tWorkers); i++ {
- err := pool.tWorkers[i].Start()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) Destroy(ctx context.Context) error {
- for i := 0; i < len(pool.tWorkers); i++ {
- pool.tWorkers[i].Stop()
- }
-
- pool.wp.Destroy(ctx)
- return nil
-}
-
-// Workers returns list of all allocated workers.
-func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker {
- return pool.wp.Workers()
-}
-
-// ActivityNames returns list of all available activity names.
-func (pool *activityPoolImpl) ActivityNames() []string {
- return pool.activities
-}
-
-// ActivityNames returns list of all available activity names.
-func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
- const op = errors.Op("activity_pool_get_activity_context")
- c, ok := pool.running.Load(string(taskToken))
- if !ok {
- return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
- }
-
- return c.(context.Context), nil
-}
-
-// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
-func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("activity_pool_create_temporal_worker")
-
- workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
- if err != nil {
- return errors.E(op, err)
- }
-
- pool.activities = make([]string, 0)
- pool.tWorkers = make([]worker.Worker, 0)
-
- for i := 0; i < len(workerInfo); i++ {
- w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
- if err != nil {
- return errors.E(op, err, pool.Destroy(ctx))
- }
-
- pool.tWorkers = append(pool.tWorkers, w)
- for j := 0; j < len(workerInfo[i].Activities); j++ {
- w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
- Name: workerInfo[i].Activities[j].Name,
- DisableAlreadyRegisteredCheck: false,
- })
-
- pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
- }
- }
-
- return nil
-}
-
-// executes activity with underlying worker.
-func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
- const op = errors.Op("activity_pool_execute_activity")
-
- heartbeatDetails := &common.Payloads{}
- if activity.HasHeartbeatDetails(ctx) {
- err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails)
- if err != nil {
- return nil, errors.E(op, err)
- }
- }
-
- var info = activity.GetInfo(ctx)
- var msg = rrt.Message{
- ID: atomic.AddUint64(&pool.seqID, 1),
- Command: rrt.InvokeActivity{
- Name: info.ActivityType.Name,
- Info: info,
- HeartbeatDetails: len(heartbeatDetails.Payloads),
- },
- Payloads: args,
- }
-
- if len(heartbeatDetails.Payloads) != 0 {
- msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
- }
-
- pool.running.Store(string(info.TaskToken), ctx)
- defer pool.running.Delete(string(info.TaskToken))
-
- result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return nil, errors.E(op, errors.Str("invalid activity worker response"))
- }
-
- out := result[0]
- if out.Failure != nil {
- if out.Failure.Message == doNotCompleteOnReturn {
- return nil, activity.ErrResultPending
- }
-
- return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc)
- }
-
- return out.Payloads, nil
-}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
deleted file mode 100644
index 5e562a8d..00000000
--- a/plugins/temporal/activity/plugin.go
+++ /dev/null
@@ -1,215 +0,0 @@
-package activity
-
-import (
- "context"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
-)
-
-const (
- // PluginName defines public service name.
- PluginName = "activities"
-
- // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
- RRMode = "temporal/activity"
-)
-
-// Plugin to manage activity execution.
-type Plugin struct {
- temporal client.Temporal
- events events.Handler
- server server.Server
- log logger.Logger
- mu sync.Mutex
- reset chan struct{}
- pool activityPool
- closing int64
-}
-
-// Init configures activity service.
-func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
- const op = errors.Op("activity_plugin_init")
- if temporal.GetConfig().Activities == nil {
- // no need to serve activities
- return errors.E(op, errors.Disabled)
- }
-
- p.temporal = temporal
- p.server = server
- p.events = events.NewEventsHandler()
- p.log = log
- p.reset = make(chan struct{})
-
- return nil
-}
-
-// Serve activities with underlying workers.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("activity_plugin_serve")
-
- errCh := make(chan error, 1)
- pool, err := p.startPool()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- p.pool = pool
-
- go func() {
- for {
- select {
- case <-p.reset:
- if atomic.LoadInt64(&p.closing) == 1 {
- return
- }
-
- err := p.replacePool()
- if err == nil {
- continue
- }
-
- bkoff := backoff.NewExponentialBackOff()
- bkoff.InitialInterval = time.Second
-
- err = backoff.Retry(p.replacePool, bkoff)
- if err != nil {
- errCh <- errors.E(op, err)
- }
- }
- }
- }()
-
- return errCh
-}
-
-// Stop stops the serving plugin.
-func (p *Plugin) Stop() error {
- atomic.StoreInt64(&p.closing, 1)
- const op = errors.Op("activity_plugin_stop")
-
- pool := p.getPool()
- if pool != nil {
- p.pool = nil
- err := pool.Destroy(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// RPC returns associated rpc service.
-func (p *Plugin) RPC() interface{} {
- return &rpc{srv: p, client: p.temporal.GetClient()}
-}
-
-// Workers returns pool workers.
-func (p *Plugin) Workers() []worker.SyncWorker {
- return p.getPool().Workers()
-}
-
-// ActivityNames returns list of all available activities.
-func (p *Plugin) ActivityNames() []string {
- return p.pool.ActivityNames()
-}
-
-// Reset resets underlying workflow pool with new copy.
-func (p *Plugin) Reset() error {
- p.reset <- struct{}{}
-
- return nil
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) AddListener(listener events.Listener) {
- p.events.AddListener(listener)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) poolListener(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventPoolError {
- p.log.Error("Activity pool error", "error", ev.Payload.(error))
- p.reset <- struct{}{}
- }
- }
-
- p.events.Push(event)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) startPool() (activityPool, error) {
- pool, err := newActivityPool(
- p.temporal.GetCodec().WithLogger(p.log),
- p.poolListener,
- *p.temporal.GetConfig().Activities,
- p.server,
- )
-
- if err != nil {
- return nil, errors.E(errors.Op("newActivityPool"), err)
- }
-
- err = pool.Start(context.Background(), p.temporal)
- if err != nil {
- return nil, errors.E(errors.Op("startActivityPool"), err)
- }
-
- p.log.Debug("Started activity processing", "activities", pool.ActivityNames())
-
- return pool, nil
-}
-
-func (p *Plugin) replacePool() error {
- pool, err := p.startPool()
- if err != nil {
- p.log.Error("Replace activity pool failed", "error", err)
- return errors.E(errors.Op("newActivityPool"), err)
- }
-
- p.log.Debug("Replace activity pool")
-
- var previous activityPool
-
- p.mu.Lock()
- previous, p.pool = p.pool, pool
- p.mu.Unlock()
-
- errD := previous.Destroy(context.Background())
- if errD != nil {
- p.log.Error(
- "Unable to destroy expired activity pool",
- "error",
- errors.E(errors.Op("destroyActivityPool"), errD),
- )
- }
-
- return nil
-}
-
-// getPool returns currently pool.
-func (p *Plugin) getPool() activityPool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.pool
-}
diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go
deleted file mode 100644
index 49efcd4f..00000000
--- a/plugins/temporal/activity/rpc.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package activity
-
-import (
- v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- "go.temporal.io/sdk/client"
- "google.golang.org/protobuf/proto"
-)
-
-/*
-- the method's type is exported.
-- the method is exported.
-- the method has two arguments, both exported (or builtin) types.
-- the method's second argument is a pointer.
-- the method has return type error.
-*/
-type rpc struct {
- srv *Plugin
- client client.Client
-}
-
-// RecordHeartbeatRequest sent by activity to record current state.
-type RecordHeartbeatRequest struct {
- TaskToken []byte `json:"taskToken"`
- Details []byte `json:"details"`
-}
-
-// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled.
-type RecordHeartbeatResponse struct {
- Canceled bool `json:"canceled"`
-}
-
-// RecordActivityHeartbeat records heartbeat for an activity.
-// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
-// details - is the progress you want to record along with heart beat for this activity.
-// The errors it can return:
-// - EntityNotExistsError
-// - InternalServiceError
-// - CanceledError
-func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error {
- details := &commonpb.Payloads{}
-
- if len(in.Details) != 0 {
- if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
- return err
- }
- }
-
- // find running activity
- ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken)
- if err != nil {
- return err
- }
-
- activity.RecordHeartbeat(ctx, details)
-
- select {
- case <-ctx.Done():
- *out = RecordHeartbeatResponse{Canceled: true}
- default:
- *out = RecordHeartbeatResponse{Canceled: false}
- }
-
- return nil
-}
diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go
deleted file mode 100644
index 10257070..00000000
--- a/plugins/temporal/client/doc/doc.go
+++ /dev/null
@@ -1 +0,0 @@
-package doc
diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio
deleted file mode 100644
index f2350af8..00000000
--- a/plugins/temporal/client/doc/temporal.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
deleted file mode 100644
index 047a1815..00000000
--- a/plugins/temporal/client/plugin.go
+++ /dev/null
@@ -1,169 +0,0 @@
-package client
-
-import (
- "fmt"
- "os"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/sdk/client"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/worker"
-)
-
-// PluginName defines public service name.
-const PluginName = "temporal"
-
-// indicates that the case size was set
-var stickyCacheSet = false
-
-// Plugin implement Temporal contract.
-type Plugin struct {
- workerID int32
- cfg *Config
- dc converter.DataConverter
- log logger.Logger
- client client.Client
-}
-
-// Temporal define common interface for RoadRunner plugins.
-type Temporal interface {
- GetClient() client.Client
- GetDataConverter() converter.DataConverter
- GetConfig() Config
- GetCodec() rrt.Codec
- CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error)
-}
-
-// Config of the temporal client and depended services.
-type Config struct {
- Address string
- Namespace string
- Activities *pool.Config
- Codec string
- DebugLevel int `mapstructure:"debug_level"`
- CacheSize int `mapstructure:"cache_size"`
-}
-
-// Init initiates temporal client plugin.
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("temporal_client_plugin_init")
- p.log = log
- p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, err)
- }
- if p.cfg == nil {
- return errors.E(op, errors.Disabled)
- }
-
- return nil
-}
-
-// GetConfig returns temporal configuration.
-func (p *Plugin) GetConfig() Config {
- if p.cfg != nil {
- return *p.cfg
- }
- // empty
- return Config{}
-}
-
-// GetCodec returns communication codec.
-func (p *Plugin) GetCodec() rrt.Codec {
- if p.cfg.Codec == "json" {
- return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log)
- }
-
- // production ready protocol, no debug abilities
- return rrt.NewProtoCodec()
-}
-
-// GetDataConverter returns data active data converter.
-func (p *Plugin) GetDataConverter() converter.DataConverter {
- return p.dc
-}
-
-// Serve starts temporal srv.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("temporal_client_plugin_serve")
- errCh := make(chan error, 1)
- var err error
-
- if stickyCacheSet == false && p.cfg.CacheSize != 0 {
- worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize)
- stickyCacheSet = true
- }
-
- p.client, err = client.NewClient(client.Options{
- Logger: p.log,
- HostPort: p.cfg.Address,
- Namespace: p.cfg.Namespace,
- DataConverter: p.dc,
- })
-
- if err != nil {
- errCh <- errors.E(op, err)
- }
-
- p.log.Debug("connected to temporal server", "address", p.cfg.Address)
-
- return errCh
-}
-
-// Stop stops temporal srv connection.
-func (p *Plugin) Stop() error {
- if p.client != nil {
- p.client.Close()
- }
-
- return nil
-}
-
-// GetClient returns active srv connection.
-func (p *Plugin) GetClient() client.Client {
- return p.client
-}
-
-// CreateWorker allocates new temporal worker on an active connection.
-func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
- const op = errors.Op("temporal_client_plugin_create_worker")
- if p.client == nil {
- return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
- }
-
- if options.Identity == "" {
- if tq == "" {
- tq = client.DefaultNamespace
- }
-
- // ensures unique worker IDs
- options.Identity = fmt.Sprintf(
- "%d@%s@%s@%v",
- os.Getpid(),
- getHostName(),
- tq,
- atomic.AddInt32(&p.workerID, 1),
- )
- }
-
- return worker.New(p.client, tq, options), nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func getHostName() string {
- hostName, err := os.Hostname()
- if err != nil {
- hostName = "Unknown"
- }
- return hostName
-}
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
deleted file mode 100644
index 406e70f4..00000000
--- a/plugins/temporal/protocol/converter.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package protocol
-
-import (
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-type (
- // DataConverter wraps Temporal data converter to enable direct access to the payloads.
- DataConverter struct {
- fallback converter.DataConverter
- }
-)
-
-// NewDataConverter creates new data converter.
-func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
- return &DataConverter{fallback: fallback}
-}
-
-// ToPayloads converts a list of values.
-func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
- for _, v := range values {
- if aggregated, ok := v.(*commonpb.Payloads); ok {
- // bypassing
- return aggregated, nil
- }
- }
-
- return r.fallback.ToPayloads(values...)
-}
-
-// ToPayload converts single value to payload.
-func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
- return r.fallback.ToPayload(value)
-}
-
-// FromPayloads converts to a list of values of different types.
-// Useful for deserializing arguments of function invocations.
-func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
- if payloads == nil {
- return nil
- }
-
- if len(valuePtrs) == 1 {
- // input proxying
- if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
- *input = &commonpb.Payloads{}
- (*input).Payloads = payloads.Payloads
- return nil
- }
- }
-
- for i := 0; i < len(payloads.Payloads); i++ {
- err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// FromPayload converts single value from payload.
-func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
- return r.fallback.FromPayload(payload, valuePtr)
-}
-
-// ToString converts payload object into human readable string.
-func (r *DataConverter) ToString(input *commonpb.Payload) string {
- return r.fallback.ToString(input)
-}
-
-// ToStrings converts payloads object into human readable strings.
-func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
- return r.fallback.ToStrings(input)
-}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
deleted file mode 100644
index 6ce9fa0f..00000000
--- a/plugins/temporal/protocol/converter_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package protocol
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-func Test_Passthough(t *testing.T) {
- codec := NewDataConverter(converter.GetDefaultDataConverter())
-
- value, err := codec.ToPayloads("test")
- assert.NoError(t, err)
-
- out := &common.Payloads{}
-
- assert.Len(t, out.Payloads, 0)
- assert.NoError(t, codec.FromPayloads(value, &out))
-
- assert.Len(t, out.Payloads, 1)
-}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
deleted file mode 100644
index c554e28f..00000000
--- a/plugins/temporal/protocol/internal/protocol.pb.go
+++ /dev/null
@@ -1,167 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// source: protocol.proto
-
-package internal
-
-import (
- fmt "fmt"
- math "math"
-
- proto "github.com/golang/protobuf/proto"
- v11 "go.temporal.io/api/common/v1"
- v1 "go.temporal.io/api/failure/v1"
-)
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = fmt.Errorf
-var _ = math.Inf
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the proto package it is being compiled against.
-// A compilation error at this line likely means your copy of the
-// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
-
-type Frame struct {
- Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Frame) Reset() { *m = Frame{} }
-func (m *Frame) String() string { return proto.CompactTextString(m) }
-func (*Frame) ProtoMessage() {}
-func (*Frame) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{0}
-}
-
-func (m *Frame) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Frame.Unmarshal(m, b)
-}
-func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
-}
-func (m *Frame) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Frame.Merge(m, src)
-}
-func (m *Frame) XXX_Size() int {
- return xxx_messageInfo_Frame.Size(m)
-}
-func (m *Frame) XXX_DiscardUnknown() {
- xxx_messageInfo_Frame.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Frame proto.InternalMessageInfo
-
-func (m *Frame) GetMessages() []*Message {
- if m != nil {
- return m.Messages
- }
- return nil
-}
-
-// Single communication message.
-type Message struct {
- Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
- // command name (if any)
- Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
- // command options in json format.
- Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
- // error response.
- Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
- // invocation or result payloads.
- Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Message) Reset() { *m = Message{} }
-func (m *Message) String() string { return proto.CompactTextString(m) }
-func (*Message) ProtoMessage() {}
-func (*Message) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{1}
-}
-
-func (m *Message) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Message.Unmarshal(m, b)
-}
-func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Message.Marshal(b, m, deterministic)
-}
-func (m *Message) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Message.Merge(m, src)
-}
-func (m *Message) XXX_Size() int {
- return xxx_messageInfo_Message.Size(m)
-}
-func (m *Message) XXX_DiscardUnknown() {
- xxx_messageInfo_Message.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Message proto.InternalMessageInfo
-
-func (m *Message) GetId() uint64 {
- if m != nil {
- return m.Id
- }
- return 0
-}
-
-func (m *Message) GetCommand() string {
- if m != nil {
- return m.Command
- }
- return ""
-}
-
-func (m *Message) GetOptions() []byte {
- if m != nil {
- return m.Options
- }
- return nil
-}
-
-func (m *Message) GetFailure() *v1.Failure {
- if m != nil {
- return m.Failure
- }
- return nil
-}
-
-func (m *Message) GetPayloads() *v11.Payloads {
- if m != nil {
- return m.Payloads
- }
- return nil
-}
-
-func init() {
- proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
- proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
-}
-
-func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
-
-var fileDescriptor_2bc2336598a3f7e0 = []byte{
- // 257 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
- 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
- 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
- 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
- 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
- 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
- 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
- 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
- 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
- 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
- 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
- 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
- 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
- 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
- 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
- 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
- 0x00,
-}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
deleted file mode 100644
index e7a77068..00000000
--- a/plugins/temporal/protocol/json_codec.go
+++ /dev/null
@@ -1,225 +0,0 @@
-package protocol
-
-import (
- "github.com/fatih/color"
- j "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-// JSONCodec can be used for debugging and log capturing reasons.
-type JSONCodec struct {
- // level enables verbose logging or all incoming and outcoming messages.
- level DebugLevel
-
- // logger renders messages when debug enabled.
- logger logger.Logger
-}
-
-// jsonFrame contains message command in binary form.
-type jsonFrame struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command name. Optional.
- Command string `json:"command,omitempty"`
-
- // Options to be unmarshalled to body (raw payload).
- Options j.RawMessage `json:"options,omitempty"`
-
- // Failure associated with command id.
- Failure []byte `json:"failure,omitempty"`
-
- // Payloads specific to the command or result.
- Payloads []byte `json:"payloads,omitempty"`
-}
-
-// NewJSONCodec creates new Json communication codec.
-func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
- return &JSONCodec{
- level: level,
- logger: logger,
- }
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
- return &JSONCodec{
- level: c.level,
- logger: logger,
- }
-}
-
-// GetName returns codec name.
-func (c *JSONCodec) GetName() string {
- return "json"
-}
-
-// Execute exchanges commands with worker.
-func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- const op = errors.Op("json_codec_execute")
- if len(msg) == 0 {
- return nil, nil
- }
-
- var response = make([]jsonFrame, 0, 5)
- var result = make([]Message, 0, 5)
- var err error
-
- frames := make([]jsonFrame, 0, len(msg))
- for _, m := range msg {
- frame, err := c.packFrame(m)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- frames = append(frames, frame)
- }
-
- p := payload.Payload{}
-
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = json.Marshal(ctx)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.Body, err = json.Marshal(frames)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if c.level >= DebugNormal {
- logMessage := string(p.Body) + " " + string(p.Context)
- if c.level >= DebugHumanized {
- logMessage = color.GreenString(logMessage)
- }
-
- c.logger.Debug(logMessage)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- if c.level >= DebugNormal {
- logMessage := string(out.Body)
- if c.level >= DebugHumanized {
- logMessage = color.HiYellowString(logMessage)
- }
-
- c.logger.Debug(logMessage, "receive", true)
- }
-
- err = json.Unmarshal(out.Body, &response)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- for _, f := range response {
- msg, err := c.parseFrame(f)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
- var (
- err error
- frame jsonFrame
- )
-
- frame.ID = msg.ID
-
- if msg.Payloads != nil {
- frame.Payloads, err = msg.Payloads.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Failure != nil {
- frame.Failure, err = msg.Failure.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Command == nil {
- return frame, nil
- }
-
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- frame.Options, err = json.Marshal(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- return frame, nil
-}
-
-func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
- var (
- err error
- msg Message
- )
-
- msg.ID = frame.ID
-
- if frame.Payloads != nil {
- msg.Payloads = &common.Payloads{}
-
- err = msg.Payloads.Unmarshal(frame.Payloads)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Failure != nil {
- msg.Failure = &failure.Failure{}
-
- err = msg.Failure.Unmarshal(frame.Failure)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Command != "" {
- cmd, err := initCommand(frame.Command)
- if err != nil {
- return Message{}, err
- }
-
- err = json.Unmarshal(frame.Options, &cmd)
- if err != nil {
- return Message{}, err
- }
-
- msg.Command = cmd
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
deleted file mode 100644
index d5e0f49d..00000000
--- a/plugins/temporal/protocol/message.go
+++ /dev/null
@@ -1,334 +0,0 @@
-package protocol
-
-import (
- "time"
-
- "github.com/spiral/errors"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-const (
- getWorkerInfoCommand = "GetWorkerInfo"
-
- invokeActivityCommand = "InvokeActivity"
- startWorkflowCommand = "StartWorkflow"
- invokeSignalCommand = "InvokeSignal"
- invokeQueryCommand = "InvokeQuery"
- destroyWorkflowCommand = "DestroyWorkflow"
- cancelWorkflowCommand = "CancelWorkflow"
- getStackTraceCommand = "StackTrace"
-
- executeActivityCommand = "ExecuteActivity"
- executeChildWorkflowCommand = "ExecuteChildWorkflow"
- getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
-
- newTimerCommand = "NewTimer"
- sideEffectCommand = "SideEffect"
- getVersionCommand = "GetVersion"
- completeWorkflowCommand = "CompleteWorkflow"
- continueAsNewCommand = "ContinueAsNew"
-
- signalExternalWorkflowCommand = "SignalExternalWorkflow"
- cancelExternalWorkflowCommand = "CancelExternalWorkflow"
-
- cancelCommand = "Cancel"
- panicCommand = "Panic"
-)
-
-// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct{}
-
-// InvokeActivity invokes activity.
-type InvokeActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
-
- // Info contains execution context.
- Info activity.Info `json:"info"`
-
- // HeartbeatDetails indicates that the payload also contains last heartbeat details.
- HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
-}
-
-// StartWorkflow sends worker command to start workflow.
-type StartWorkflow struct {
- // Info to define workflow context.
- Info *workflow.Info `json:"info"`
-
- // LastCompletion contains offset of last completion results.
- LastCompletion int `json:"lastCompletion,omitempty"`
-}
-
-// InvokeSignal invokes signal with a set of arguments.
-type InvokeSignal struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-
- // Name of the signal.
- Name string `json:"name"`
-}
-
-// InvokeQuery invokes query with a set of arguments.
-type InvokeQuery struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
- // Name of the query.
- Name string `json:"name"`
-}
-
-// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
-type CancelWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// DestroyWorkflow asks worker to offload workflow from memory.
-type DestroyWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// GetStackTrace asks worker to offload workflow from memory.
-type GetStackTrace struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// ExecuteActivity command by workflow worker.
-type ExecuteActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
-}
-
-// ExecuteChildWorkflow executes child workflow.
-type ExecuteChildWorkflow struct {
- // Name defines workflow name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.WorkflowOptions `json:"options,omitempty"`
-}
-
-// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
-type GetChildWorkflowExecution struct {
- // ID of child workflow command.
- ID uint64 `json:"id"`
-}
-
-// NewTimer starts new timer.
-type NewTimer struct {
- // Milliseconds defines timer duration.
- Milliseconds int `json:"ms"`
-}
-
-// SideEffect to be recorded into the history.
-type SideEffect struct{}
-
-// GetVersion requests version marker.
-type GetVersion struct {
- ChangeID string `json:"changeID"`
- MinSupported int `json:"minSupported"`
- MaxSupported int `json:"maxSupported"`
-}
-
-// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
-type CompleteWorkflow struct{}
-
-// ContinueAsNew restarts workflow with new running instance.
-type ContinueAsNew struct {
- // Result defines workflow execution result.
- Name string `json:"name"`
-
- // Options for continued as new workflow.
- Options struct {
- TaskQueueName string
- WorkflowExecutionTimeout time.Duration
- WorkflowRunTimeout time.Duration
- WorkflowTaskTimeout time.Duration
- } `json:"options"`
-}
-
-// SignalExternalWorkflow sends signal to external workflow.
-type SignalExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
- Signal string `json:"signal"`
- ChildWorkflowOnly bool `json:"childWorkflowOnly"`
-}
-
-// CancelExternalWorkflow canceller external workflow.
-type CancelExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
-}
-
-// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
-type Cancel struct {
- // CommandIDs to be cancelled.
- CommandIDs []uint64 `json:"ids"`
-}
-
-// Panic triggers panic in workflow process.
-type Panic struct {
- // Message to include into the error.
- Message string `json:"message"`
-}
-
-// ActivityParams maps activity command to activity params.
-func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
- params := bindings.ExecuteActivityParams{
- ExecuteActivityOptions: cmd.Options,
- ActivityType: bindings.ActivityType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// WorkflowParams maps workflow command to workflow params.
-func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
- params := bindings.ExecuteWorkflowParams{
- WorkflowOptions: cmd.Options,
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// ToDuration converts timer command to time.Duration.
-func (cmd NewTimer) ToDuration() time.Duration {
- return time.Millisecond * time.Duration(cmd.Milliseconds)
-}
-
-// returns command name (only for the commands sent to the worker)
-func commandName(cmd interface{}) (string, error) {
- const op = errors.Op("command_name")
- switch cmd.(type) {
- case GetWorkerInfo, *GetWorkerInfo:
- return getWorkerInfoCommand, nil
- case StartWorkflow, *StartWorkflow:
- return startWorkflowCommand, nil
- case InvokeSignal, *InvokeSignal:
- return invokeSignalCommand, nil
- case InvokeQuery, *InvokeQuery:
- return invokeQueryCommand, nil
- case DestroyWorkflow, *DestroyWorkflow:
- return destroyWorkflowCommand, nil
- case CancelWorkflow, *CancelWorkflow:
- return cancelWorkflowCommand, nil
- case GetStackTrace, *GetStackTrace:
- return getStackTraceCommand, nil
- case InvokeActivity, *InvokeActivity:
- return invokeActivityCommand, nil
- case ExecuteActivity, *ExecuteActivity:
- return executeActivityCommand, nil
- case ExecuteChildWorkflow, *ExecuteChildWorkflow:
- return executeChildWorkflowCommand, nil
- case GetChildWorkflowExecution, *GetChildWorkflowExecution:
- return getChildWorkflowExecutionCommand, nil
- case NewTimer, *NewTimer:
- return newTimerCommand, nil
- case GetVersion, *GetVersion:
- return getVersionCommand, nil
- case SideEffect, *SideEffect:
- return sideEffectCommand, nil
- case CompleteWorkflow, *CompleteWorkflow:
- return completeWorkflowCommand, nil
- case ContinueAsNew, *ContinueAsNew:
- return continueAsNewCommand, nil
- case SignalExternalWorkflow, *SignalExternalWorkflow:
- return signalExternalWorkflowCommand, nil
- case CancelExternalWorkflow, *CancelExternalWorkflow:
- return cancelExternalWorkflowCommand, nil
- case Cancel, *Cancel:
- return cancelCommand, nil
- case Panic, *Panic:
- return panicCommand, nil
- default:
- return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
- }
-}
-
-// reads command from binary payload
-func initCommand(name string) (interface{}, error) {
- const op = errors.Op("init_command")
- switch name {
- case getWorkerInfoCommand:
- return &GetWorkerInfo{}, nil
-
- case startWorkflowCommand:
- return &StartWorkflow{}, nil
-
- case invokeSignalCommand:
- return &InvokeSignal{}, nil
-
- case invokeQueryCommand:
- return &InvokeQuery{}, nil
-
- case destroyWorkflowCommand:
- return &DestroyWorkflow{}, nil
-
- case cancelWorkflowCommand:
- return &CancelWorkflow{}, nil
-
- case getStackTraceCommand:
- return &GetStackTrace{}, nil
-
- case invokeActivityCommand:
- return &InvokeActivity{}, nil
-
- case executeActivityCommand:
- return &ExecuteActivity{}, nil
-
- case executeChildWorkflowCommand:
- return &ExecuteChildWorkflow{}, nil
-
- case getChildWorkflowExecutionCommand:
- return &GetChildWorkflowExecution{}, nil
-
- case newTimerCommand:
- return &NewTimer{}, nil
-
- case getVersionCommand:
- return &GetVersion{}, nil
-
- case sideEffectCommand:
- return &SideEffect{}, nil
-
- case completeWorkflowCommand:
- return &CompleteWorkflow{}, nil
-
- case continueAsNewCommand:
- return &ContinueAsNew{}, nil
-
- case signalExternalWorkflowCommand:
- return &SignalExternalWorkflow{}, nil
-
- case cancelExternalWorkflowCommand:
- return &CancelExternalWorkflow{}, nil
-
- case cancelCommand:
- return &Cancel{}, nil
-
- case panicCommand:
- return &Panic{}, nil
-
- default:
- return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
- }
-}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
deleted file mode 100644
index 607fe0fe..00000000
--- a/plugins/temporal/protocol/proto_codec.go
+++ /dev/null
@@ -1,145 +0,0 @@
-package protocol
-
-import (
- v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
- jsoniter "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
- "google.golang.org/protobuf/proto"
-)
-
-type (
- // ProtoCodec uses protobuf to exchange messages with underlying workers.
- ProtoCodec struct {
- }
-)
-
-// NewProtoCodec creates new Proto communication codec.
-func NewProtoCodec() Codec {
- return &ProtoCodec{}
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
- return &ProtoCodec{}
-}
-
-// GetName returns codec name.
-func (c *ProtoCodec) GetName() string {
- return "protobuf"
-}
-
-// Execute exchanges commands with worker.
-func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- if len(msg) == 0 {
- return nil, nil
- }
-
- var request = &internal.Frame{}
- var response = &internal.Frame{}
- var result = make([]Message, 0, 5)
- var err error
-
- for _, m := range msg {
- frame, err := c.packMessage(m)
- if err != nil {
- return nil, err
- }
-
- request.Messages = append(request.Messages, frame)
- }
-
- p := payload.Payload{}
-
- // context is always in json format
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = jsoniter.Marshal(ctx)
- if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
- }
-
- p.Body, err = proto.Marshal(v1.MessageV2(request))
- if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- err = proto.Unmarshal(out.Body, v1.MessageV2(response))
- if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
- }
-
- for _, f := range response.Messages {
- msg, err := c.parseMessage(f)
- if err != nil {
- return nil, err
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
- var err error
-
- frame := &internal.Message{
- Id: msg.ID,
- Payloads: msg.Payloads,
- Failure: msg.Failure,
- }
-
- if msg.Command != nil {
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return nil, err
- }
-
- frame.Options, err = jsoniter.Marshal(msg.Command)
- if err != nil {
- return nil, err
- }
- }
-
- return frame, nil
-}
-
-func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
- const op = errors.Op("proto_codec_parse_message")
- var err error
-
- msg := Message{
- ID: frame.Id,
- Payloads: frame.Payloads,
- Failure: frame.Failure,
- }
-
- if frame.Command != "" {
- msg.Command, err = initCommand(frame.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
-
- err = jsoniter.Unmarshal(frame.Options, &msg.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
deleted file mode 100644
index 53076fdf..00000000
--- a/plugins/temporal/protocol/protocol.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-const (
- // DebugNone disables all debug messages.
- DebugNone = iota
-
- // DebugNormal renders all messages into console.
- DebugNormal
-
- // DebugHumanized enables color highlights for messages.
- DebugHumanized
-)
-
-// Context provides worker information about currently. Context can be empty for server level commands.
-type Context struct {
- // TaskQueue associates message batch with the specific task queue in underlying worker.
- TaskQueue string `json:"taskQueue,omitempty"`
-
- // TickTime associated current or historical time with message batch.
- TickTime string `json:"tickTime,omitempty"`
-
- // Replay indicates that current message batch is historical.
- Replay bool `json:"replay,omitempty"`
-}
-
-// Message used to exchange the send commands and receive responses from underlying workers.
-type Message struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command of the message in unmarshalled form. Pointer.
- Command interface{} `json:"command,omitempty"`
-
- // Failure associated with command id.
- Failure *failure.Failure `json:"failure,omitempty"`
-
- // Payloads contains message specific payloads in binary format.
- Payloads *commonpb.Payloads `json:"payloads,omitempty"`
-}
-
-// Codec manages payload encoding and decoding while communication with underlying worker.
-type Codec interface {
- // WithLogger creates new codes instance with attached logger.
- WithLogger(logger.Logger) Codec
-
- // GetName returns codec name.
- GetName() string
-
- // Execute sends message to worker and waits for the response.
- Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
-}
-
-// Endpoint provides the ability to send and receive messages.
-type Endpoint interface {
- // ExecWithContext allow to set ExecTTL
- Exec(p payload.Payload) (payload.Payload, error)
-}
-
-// DebugLevel configures debug level.
-type DebugLevel int
-
-// IsEmpty only check if task queue set.
-func (ctx Context) IsEmpty() bool {
- return ctx.TaskQueue == ""
-}
-
-// IsCommand returns true if message carries request.
-func (msg Message) IsCommand() bool {
- return msg.Command != nil
-}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
deleted file mode 100644
index 58a0ae66..00000000
--- a/plugins/temporal/protocol/worker_info.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/errors"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/worker"
-)
-
-// WorkerInfo outlines information about every available worker and it's TaskQueues.
-
-// WorkerInfo lists available task queues, workflows and activities.
-type WorkerInfo struct {
- // TaskQueue assigned to the worker.
- TaskQueue string `json:"taskQueue"`
-
- // Options describe worker options.
- Options worker.Options `json:"options,omitempty"`
-
- // Workflows provided by the worker.
- Workflows []WorkflowInfo
-
- // Activities provided by the worker.
- Activities []ActivityInfo
-}
-
-// WorkflowInfo describes single worker workflow.
-type WorkflowInfo struct {
- // Name of the workflow.
- Name string `json:"name"`
-
- // Queries pre-defined for the workflow type.
- Queries []string `json:"queries"`
-
- // Signals pre-defined for the workflow type.
- Signals []string `json:"signals"`
-}
-
-// ActivityInfo describes single worker activity.
-type ActivityInfo struct {
- // Name describes public activity name.
- Name string `json:"name"`
-}
-
-// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
-func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
- const op = errors.Op("fetch_worker_info")
-
- result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return nil, errors.E(op, errors.Str("unable to read worker info"))
- }
-
- if result[0].ID != 0 {
- return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
- }
-
- var info []WorkerInfo
- for i := range result[0].Payloads.Payloads {
- wi := WorkerInfo{}
- if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
- return nil, errors.E(op, err)
- }
-
- info = append(info, wi)
- }
-
- return info, nil
-}
diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go
deleted file mode 100644
index 962c527f..00000000
--- a/plugins/temporal/workflow/canceller.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package workflow
-
-import (
- "sync"
-)
-
-type cancellable func() error
-
-type canceller struct {
- ids sync.Map
-}
-
-func (c *canceller) register(id uint64, cancel cancellable) {
- c.ids.Store(id, cancel)
-}
-
-func (c *canceller) discard(id uint64) {
- c.ids.Delete(id)
-}
-
-func (c *canceller) cancel(ids ...uint64) error {
- var err error
- for _, id := range ids {
- cancel, ok := c.ids.Load(id)
- if ok == false {
- continue
- }
-
- // TODO return when minimum supported version will be go 1.15
- // go1.14 don't have LoadAndDelete method
- // It was introduced only in go1.15
- c.ids.Delete(id)
-
- err = cancel.(cancellable)()
- if err != nil {
- return err
- }
- }
-
- return nil
-}
diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go
deleted file mode 100644
index d6e846f8..00000000
--- a/plugins/temporal/workflow/canceller_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package workflow
-
-import (
- "errors"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_CancellerNoListeners(t *testing.T) {
- c := &canceller{}
-
- assert.NoError(t, c.cancel(1))
-}
-
-func Test_CancellerListenerError(t *testing.T) {
- c := &canceller{}
- c.register(1, func() error {
- return errors.New("failed")
- })
-
- assert.Error(t, c.cancel(1))
-}
-
-func Test_CancellerListenerDiscarded(t *testing.T) {
- c := &canceller{}
- c.register(1, func() error {
- return errors.New("failed")
- })
-
- c.discard(1)
- assert.NoError(t, c.cancel(1))
-}
diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go
deleted file mode 100644
index ac75cbda..00000000
--- a/plugins/temporal/workflow/id_registry.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package workflow
-
-import (
- "sync"
-
- bindings "go.temporal.io/sdk/internalbindings"
-)
-
-// used to gain access to child workflow ids after they become available via callback result.
-type idRegistry struct {
- mu sync.Mutex
- ids map[uint64]entry
- listeners map[uint64]listener
-}
-
-type listener func(w bindings.WorkflowExecution, err error)
-
-type entry struct {
- w bindings.WorkflowExecution
- err error
-}
-
-func newIDRegistry() *idRegistry {
- return &idRegistry{
- ids: map[uint64]entry{},
- listeners: map[uint64]listener{},
- }
-}
-
-func (c *idRegistry) listen(id uint64, cl listener) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- c.listeners[id] = cl
-
- if e, ok := c.ids[id]; ok {
- cl(e.w, e.err)
- }
-}
-
-func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- e := entry{w: w, err: err}
- c.ids[id] = e
-
- if l, ok := c.listeners[id]; ok {
- l(e.w, e.err)
- }
-}
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
deleted file mode 100644
index 8f4409d1..00000000
--- a/plugins/temporal/workflow/message_queue.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package workflow
-
-import (
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-type messageQueue struct {
- seqID func() uint64
- queue []rrt.Message
-}
-
-func newMessageQueue(sedID func() uint64) *messageQueue {
- return &messageQueue{
- seqID: sedID,
- queue: make([]rrt.Message, 0, 5),
- }
-}
-
-func (mq *messageQueue) flush() {
- mq.queue = mq.queue[0:0]
-}
-
-func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
- msg := rrt.Message{
- ID: mq.seqID(),
- Command: cmd,
- Payloads: payloads,
- }
-
- return msg.ID, msg
-}
-
-func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
- id, msg := mq.allocateMessage(cmd, payloads)
- mq.queue = append(mq.queue, msg)
- return id
-}
-
-func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
- mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads})
-}
-
-func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) {
- mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure})
-}
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
deleted file mode 100644
index 1fcd409f..00000000
--- a/plugins/temporal/workflow/message_queue_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package workflow
-
-import (
- "sync/atomic"
- "testing"
-
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-func Test_MessageQueueFlushError(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- mq.pushError(1, &failure.Failure{})
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
- assert.Equal(t, uint64(0), index)
-}
-
-func Test_MessageQueueFlushResponse(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- mq.pushResponse(1, &common.Payloads{})
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
- assert.Equal(t, uint64(0), index)
-}
-
-func Test_MessageQueueCommandID(t *testing.T) {
- var index uint64
- mq := newMessageQueue(func() uint64 {
- return atomic.AddUint64(&index, 1)
- })
-
- n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
- assert.Equal(t, n, index)
- assert.Len(t, mq.queue, 1)
-
- mq.flush()
- assert.Len(t, mq.queue, 0)
-}
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
deleted file mode 100644
index 572d9a3b..00000000
--- a/plugins/temporal/workflow/plugin.go
+++ /dev/null
@@ -1,203 +0,0 @@
-package workflow
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
-)
-
-const (
- // PluginName defines public service name.
- PluginName = "workflows"
-
- // RRMode sets as RR_MODE env variable to let worker know about the mode to run.
- RRMode = "temporal/workflow"
-)
-
-// Plugin manages workflows and workers.
-type Plugin struct {
- temporal client.Temporal
- events events.Handler
- server server.Server
- log logger.Logger
- mu sync.Mutex
- reset chan struct{}
- pool workflowPool
- closing int64
-}
-
-// Init workflow plugin.
-func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
- p.temporal = temporal
- p.server = server
- p.events = events.NewEventsHandler()
- p.log = log
- p.reset = make(chan struct{}, 1)
-
- return nil
-}
-
-// Serve starts workflow service.
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("workflow_plugin_serve")
- errCh := make(chan error, 1)
-
- pool, err := p.startPool()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- p.pool = pool
-
- go func() {
- for {
- select {
- case <-p.reset:
- if atomic.LoadInt64(&p.closing) == 1 {
- return
- }
-
- err := p.replacePool()
- if err == nil {
- continue
- }
-
- bkoff := backoff.NewExponentialBackOff()
- bkoff.InitialInterval = time.Second
-
- err = backoff.Retry(p.replacePool, bkoff)
- if err != nil {
- errCh <- errors.E(op, err)
- }
- }
- }
- }()
-
- return errCh
-}
-
-// Stop workflow service.
-func (p *Plugin) Stop() error {
- const op = errors.Op("workflow_plugin_stop")
- atomic.StoreInt64(&p.closing, 1)
-
- pool := p.getPool()
- if pool != nil {
- p.pool = nil
- err := pool.Destroy(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return nil
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// Workers returns list of available workflow workers.
-func (p *Plugin) Workers() []worker.BaseProcess {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.pool.Workers()
-}
-
-// WorkflowNames returns list of all available workflows.
-func (p *Plugin) WorkflowNames() []string {
- return p.pool.WorkflowNames()
-}
-
-// Reset resets underlying workflow pool with new copy.
-func (p *Plugin) Reset() error {
- p.reset <- struct{}{}
-
- return nil
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) poolListener(event interface{}) {
- if ev, ok := event.(PoolEvent); ok {
- if ev.Event == eventWorkerExit {
- if ev.Caused != nil {
- p.log.Error("Workflow pool error", "error", ev.Caused)
- }
- p.reset <- struct{}{}
- }
- }
-
- p.events.Push(event)
-}
-
-// AddListener adds event listeners to the service.
-func (p *Plugin) startPool() (workflowPool, error) {
- const op = errors.Op("workflow_plugin_start_pool")
- pool, err := newWorkflowPool(
- p.temporal.GetCodec().WithLogger(p.log),
- p.poolListener,
- p.server,
- )
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- err = pool.Start(context.Background(), p.temporal)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
-
- return pool, nil
-}
-
-func (p *Plugin) replacePool() error {
- p.mu.Lock()
- const op = errors.Op("workflow_plugin_replace_pool")
- defer p.mu.Unlock()
-
- if p.pool != nil {
- err := p.pool.Destroy(context.Background())
- p.pool = nil
- if err != nil {
- p.log.Error(
- "Unable to destroy expired workflow pool",
- "error",
- errors.E(op, err),
- )
- return errors.E(op, err)
- }
- }
-
- pool, err := p.startPool()
- if err != nil {
- p.log.Error("Replace workflow pool failed", "error", err)
- return errors.E(op, err)
- }
-
- p.pool = pool
- p.log.Debug("workflow pool successfully replaced")
-
- return nil
-}
-
-// getPool returns currently pool.
-func (p *Plugin) getPool() workflowPool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.pool
-}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
deleted file mode 100644
index 45e6885c..00000000
--- a/plugins/temporal/workflow/process.go
+++ /dev/null
@@ -1,436 +0,0 @@
-package workflow
-
-import (
- "strconv"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- commonpb "go.temporal.io/api/common/v1"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-// wraps single workflow process
-type workflowProcess struct {
- codec rrt.Codec
- pool workflowPool
- env bindings.WorkflowEnvironment
- header *commonpb.Header
- mq *messageQueue
- ids *idRegistry
- seqID uint64
- runID string
- pipeline []rrt.Message
- callbacks []func() error
- canceller *canceller
- inLoop bool
-}
-
-// Execute workflow, bootstraps process.
-func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
- wf.env = env
- wf.header = header
- wf.seqID = 0
- wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
- wf.canceller = &canceller{}
-
- // sequenceID shared for all worker workflows
- wf.mq = newMessageQueue(wf.pool.SeqID)
- wf.ids = newIDRegistry()
-
- env.RegisterCancelHandler(wf.handleCancel)
- env.RegisterSignalHandler(wf.handleSignal)
- env.RegisterQueryHandler(wf.handleQuery)
-
- var (
- lastCompletion = bindings.GetLastCompletionResult(env)
- lastCompletionOffset = 0
- )
-
- if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
- if input == nil {
- input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
- }
-
- input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
- lastCompletionOffset = len(lastCompletion.Payloads)
- }
-
- _ = wf.mq.pushCommand(
- rrt.StartWorkflow{
- Info: env.WorkflowInfo(),
- LastCompletion: lastCompletionOffset,
- },
- input,
- )
-}
-
-// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
-func (wf *workflowProcess) OnWorkflowTaskStarted() {
- wf.inLoop = true
- defer func() { wf.inLoop = false }()
-
- var err error
- for _, callback := range wf.callbacks {
- err = callback()
- if err != nil {
- panic(err)
- }
- }
- wf.callbacks = nil
-
- if err := wf.flushQueue(); err != nil {
- panic(err)
- }
-
- for len(wf.pipeline) > 0 {
- msg := wf.pipeline[0]
- wf.pipeline = wf.pipeline[1:]
-
- if msg.IsCommand() {
- err = wf.handleMessage(msg)
- }
-
- if err != nil {
- panic(err)
- }
- }
-}
-
-// StackTrace renders workflow stack trace.
-func (wf *workflowProcess) StackTrace() string {
- result, err := wf.runCommand(
- rrt.GetStackTrace{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- },
- nil,
- )
-
- if err != nil {
- return err.Error()
- }
-
- var stacktrace string
- err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
- if err != nil {
- return err.Error()
- }
-
- return stacktrace
-}
-
-// Close the workflow.
-func (wf *workflowProcess) Close() {
- // TODO: properly handle errors
- // panic(err)
-
- _ = wf.mq.pushCommand(
- rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-
- _, _ = wf.discardQueue()
-}
-
-// execution context.
-func (wf *workflowProcess) getContext() rrt.Context {
- return rrt.Context{
- TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
- TickTime: wf.env.Now().Format(time.RFC3339),
- Replay: wf.env.IsReplaying(),
- }
-}
-
-// schedule cancel command
-func (wf *workflowProcess) handleCancel() {
- _ = wf.mq.pushCommand(
- rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-}
-
-// schedule the signal processing
-func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _ = wf.mq.pushCommand(
- rrt.InvokeSignal{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- Name: name,
- },
- input,
- )
-}
-
-// Handle query in blocking mode.
-func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
- result, err := wf.runCommand(
- rrt.InvokeQuery{
- RunID: wf.runID,
- Name: queryType,
- },
- queryArgs,
- )
-
- if err != nil {
- return nil, err
- }
-
- if result.Failure != nil {
- return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
- }
-
- return result.Payloads, nil
-}
-
-// process incoming command
-func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
- const op = errors.Op("handleMessage")
- var err error
-
- var (
- id = msg.ID
- cmd = msg.Command
- payloads = msg.Payloads
- )
-
- switch cmd := cmd.(type) {
- case *rrt.ExecuteActivity:
- params := cmd.ActivityParams(wf.env, payloads)
- activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelActivity(activityID)
- return nil
- })
-
- case *rrt.ExecuteChildWorkflow:
- params := cmd.WorkflowParams(wf.env, payloads)
-
- // always use deterministic id
- if params.WorkflowID == "" {
- nextID := atomic.AddUint64(&wf.seqID, 1)
- params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
- }
-
- wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
- wf.ids.push(id, r, e)
- })
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
- return nil
- })
-
- case *rrt.GetChildWorkflowExecution:
- wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
- cl := wf.createCallback(id)
-
- // TODO rewrite
- if err != nil {
- panic(err)
- }
-
- p, err := wf.env.GetDataConverter().ToPayloads(w)
- if err != nil {
- panic(err)
- }
-
- cl(p, err)
- })
-
- case *rrt.NewTimer:
- timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
- wf.canceller.register(id, func() error {
- if timerID != nil {
- wf.env.RequestCancelTimer(*timerID)
- }
- return nil
- })
-
- case *rrt.GetVersion:
- version := wf.env.GetVersion(
- cmd.ChangeID,
- workflow.Version(cmd.MinSupported),
- workflow.Version(cmd.MaxSupported),
- )
-
- result, err := wf.env.GetDataConverter().ToPayloads(version)
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.SideEffect:
- wf.env.SideEffect(
- func() (*commonpb.Payloads, error) {
- return payloads, nil
- },
- wf.createContinuableCallback(id),
- )
-
- case *rrt.CompleteWorkflow:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- if msg.Failure == nil {
- wf.env.Complete(payloads, nil)
- } else {
- wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
- }
-
- case *rrt.ContinueAsNew:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- wf.env.Complete(nil, &workflow.ContinueAsNewError{
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- Header: wf.header,
- TaskQueueName: cmd.Options.TaskQueueName,
- WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
- WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
- WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
- })
-
- case *rrt.SignalExternalWorkflow:
- wf.env.SignalExternalWorkflow(
- cmd.Namespace,
- cmd.WorkflowID,
- cmd.RunID,
- cmd.Signal,
- payloads,
- nil,
- cmd.ChildWorkflowOnly,
- wf.createCallback(id),
- )
-
- case *rrt.CancelExternalWorkflow:
- wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
-
- case *rrt.Cancel:
- err = wf.canceller.cancel(cmd.CommandIDs...)
- if err != nil {
- return errors.E(op, err)
- }
-
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.Panic:
- panic(errors.E(cmd.Message))
-
- default:
- panic("undefined command")
- }
-
- return nil
-}
-
-func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) error {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return nil
- }
-
- // fetch original payload
- wf.mq.pushResponse(id, result)
- return nil
- }
-
- return func(result *commonpb.Payloads, err error) {
- // timer cancel callback can happen inside the loop
- if wf.inLoop {
- err := callback(result, err)
- if err != nil {
- panic(err)
- }
-
- return
- }
-
- wf.callbacks = append(wf.callbacks, func() error {
- return callback(result, err)
- })
- }
-}
-
-// callback to be called inside the queue processing, adds new messages at the end of the queue
-func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
- }
-
- return func(result *commonpb.Payloads, err error) {
- callback(result, err)
- }
-}
-
-// Exchange messages between host and worker processes and add new commands to the queue.
-func (wf *workflowProcess) flushQueue() error {
- const op = errors.Op("flush queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.pipeline = append(wf.pipeline, messages...)
-
- return nil
-}
-
-// Exchange messages between host and worker processes without adding new commands to the queue.
-func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
- const op = errors.Op("discard queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return messages, nil
-}
-
-// Run single command and return single result.
-func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("workflow_process_runcommand")
- _, msg := wf.mq.allocateMessage(cmd, payloads)
-
- result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
- if err != nil {
- return rrt.Message{}, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
- }
-
- return result[0], nil
-}
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
deleted file mode 100644
index b9ed46c8..00000000
--- a/plugins/temporal/workflow/workflow_pool.go
+++ /dev/null
@@ -1,190 +0,0 @@
-package workflow
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/temporal/client"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/worker"
- "go.temporal.io/sdk/workflow"
-)
-
-const eventWorkerExit = 8390
-
-// RR_MODE env variable key
-const RR_MODE = "RR_MODE" //nolint
-
-// RR_CODEC env variable key
-const RR_CODEC = "RR_CODEC" //nolint
-
-type workflowPool interface {
- SeqID() uint64
- Exec(p payload.Payload) (payload.Payload, error)
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.BaseProcess
- WorkflowNames() []string
-}
-
-// PoolEvent triggered on workflow pool worker events.
-type PoolEvent struct {
- Event int
- Context interface{}
- Caused error
-}
-
-// workflowPoolImpl manages workflowProcess executions between worker restarts.
-type workflowPoolImpl struct {
- codec rrt.Codec
- seqID uint64
- workflows map[string]rrt.WorkflowInfo
- tWorkers []worker.Worker
- mu sync.Mutex
- worker rrWorker.SyncWorker
- active bool
-}
-
-// newWorkflowPool creates new workflow pool.
-func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
- const op = errors.Op("new_workflow_pool")
- w, err := factory.NewWorker(
- context.Background(),
- map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()},
- listener,
- )
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- go func() {
- err := w.Wait()
- listener(PoolEvent{Event: eventWorkerExit, Caused: err})
- }()
-
- return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil
-}
-
-// Start the pool in non blocking mode.
-func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("workflow_pool_start")
- pool.mu.Lock()
- pool.active = true
- pool.mu.Unlock()
-
- err := pool.initWorkers(ctx, temporal)
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(pool.tWorkers); i++ {
- err := pool.tWorkers[i].Start()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// Active.
-func (pool *workflowPoolImpl) Active() bool {
- return pool.active
-}
-
-// Destroy stops all temporal workers and application worker.
-func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- const op = errors.Op("workflow_pool_destroy")
-
- pool.active = false
- for i := 0; i < len(pool.tWorkers); i++ {
- pool.tWorkers[i].Stop()
- }
-
- worker.PurgeStickyWorkflowCache()
-
- err := pool.worker.Stop()
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-// NewWorkflowDefinition initiates new workflow process.
-func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition {
- return &workflowProcess{
- codec: pool.codec,
- pool: pool,
- }
-}
-
-// NewWorkflowDefinition initiates new workflow process.
-func (pool *workflowPoolImpl) SeqID() uint64 {
- return atomic.AddUint64(&pool.seqID, 1)
-}
-
-// Exec set of commands in thread safe move.
-func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
- if !pool.active {
- return payload.Payload{}, nil
- }
-
- return pool.worker.Exec(p)
-}
-
-func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess {
- return []rrWorker.BaseProcess{pool.worker}
-}
-
-func (pool *workflowPoolImpl) WorkflowNames() []string {
- names := make([]string, 0, len(pool.workflows))
- for name := range pool.workflows {
- names = append(names, name)
- }
-
- return names
-}
-
-// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
-func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("workflow_pool_init_workers")
- workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
- if err != nil {
- return errors.E(op, err)
- }
-
- pool.workflows = make(map[string]rrt.WorkflowInfo)
- pool.tWorkers = make([]worker.Worker, 0)
-
- for _, info := range workerInfo {
- w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
- if err != nil {
- return errors.E(op, err, pool.Destroy(ctx))
- }
-
- pool.tWorkers = append(pool.tWorkers, w)
- for _, workflowInfo := range info.Workflows {
- w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{
- Name: workflowInfo.Name,
- DisableAlreadyRegisteredCheck: false,
- })
-
- pool.workflows[workflowInfo.Name] = workflowInfo
- }
- }
-
- return nil
-}