diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 22:47:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-25 22:47:02 +0300 |
commit | 43071e43a0743ff8c7913bba7819952962124355 (patch) | |
tree | e3b61113d3c0d28f972c71592af8b2f708994167 /plugins/temporal | |
parent | 5fd1168c687040ca7d72f4727ee1aec753d3f258 (diff) |
Initial commit of the Temporal plugins set
Diffstat (limited to 'plugins/temporal')
22 files changed, 2821 insertions, 0 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go new file mode 100644 index 00000000..0aa2a62f --- /dev/null +++ b/plugins/temporal/activity/activity_pool.go @@ -0,0 +1,199 @@ +package activity + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/worker" +) + +type ( + activityPool interface { + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.SyncWorker + ActivityNames() []string + GetActivityContext(taskToken []byte) (context.Context, error) + } + + activityPoolImpl struct { + dc converter.DataConverter + codec rrt.Codec + seqID uint64 + activities []string + wp pool.Pool + tWorkers []worker.Worker + running sync.Map + } +) + +// newActivityPool +func newActivityPool( + codec rrt.Codec, + listener events.Listener, + poolConfig pool.Config, + server server.Server, +) (activityPool, error) { + wp, err := server.NewWorkerPool( + context.Background(), + poolConfig, + map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, + listener, + ) + + if err != nil { + return nil, err + } + + return &activityPoolImpl{ + codec: codec, + wp: wp, + running: sync.Map{}, + }, nil +} + +// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + pool.dc = temporal.GetDataConverter() + + err := pool.initWorkers(ctx, temporal) + if err != nil { + return err + } + + for i := 0; i < len(pool.tWorkers); i++ { + err := pool.tWorkers[i].Start() + if err != nil { + return err + } + } + + return nil +} + +// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) Destroy(ctx context.Context) error { + for i := 0; i < len(pool.tWorkers); i++ { + pool.tWorkers[i].Stop() + } + + pool.wp.Destroy(ctx) + return nil +} + +// Workers returns list of all allocated workers. +func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker { + return pool.wp.Workers() +} + +// ActivityNames returns list of all available activity names. +func (pool *activityPoolImpl) ActivityNames() []string { + return pool.activities +} + +// ActivityNames returns list of all available activity names. +func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { + c, ok := pool.running.Load(string(taskToken)) + if !ok { + return nil, errors.E("heartbeat on non running activity") + } + + return c.(context.Context), nil +} + +// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. +func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("createTemporalWorker") + + workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) + if err != nil { + return errors.E(op, err) + } + + pool.activities = make([]string, 0) + pool.tWorkers = make([]worker.Worker, 0) + + for _, info := range workerInfo { + w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + if err != nil { + return errors.E(op, err, pool.Destroy(ctx)) + } + + pool.tWorkers = append(pool.tWorkers, w) + for _, activityInfo := range info.Activities { + w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ + Name: activityInfo.Name, + DisableAlreadyRegisteredCheck: false, + }) + + pool.activities = append(pool.activities, activityInfo.Name) + } + } + + return nil +} + +// executes activity with underlying worker. +func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { + const op = errors.Op("executeActivity") + + heartbeatDetails := &common.Payloads{} + if activity.HasHeartbeatDetails(ctx) { + err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails) + if err != nil { + return nil, errors.E(op, err) + } + } + + var ( + info = activity.GetInfo(ctx) + msg = rrt.Message{ + ID: atomic.AddUint64(&pool.seqID, 1), + Command: rrt.InvokeActivity{ + Name: info.ActivityType.Name, + Info: info, + HeartbeatDetails: len(heartbeatDetails.Payloads), + }, + Payloads: args, + } + ) + + if len(heartbeatDetails.Payloads) != 0 { + msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) + } + + pool.running.Store(string(info.TaskToken), ctx) + defer pool.running.Delete(string(info.TaskToken)) + + result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg) + if err != nil { + return nil, errors.E(op, err) + } + + if len(result) != 1 { + return nil, errors.E(op, "invalid activity worker response") + } + + out := result[0] + if out.Failure != nil { + if out.Failure.Message == "doNotCompleteOnReturn" { + return nil, activity.ErrResultPending + } + + return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc) + } + + return out.Payloads, nil +} diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go new file mode 100644 index 00000000..02d66297 --- /dev/null +++ b/plugins/temporal/activity/plugin.go @@ -0,0 +1,210 @@ +package activity + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" +) + +const ( + // PluginName defines public service name. + PluginName = "activities" + + // RRMode sets as RR_MODE env variable to let worker know about the mode to run. + RRMode = "temporal/activity" +) + +// Plugin to manage activity execution. +type Plugin struct { + temporal client.Temporal + events events.Handler + server server.Server + log logger.Logger + mu sync.Mutex + reset chan struct{} + pool activityPool + closing int64 +} + +// Init configures activity service. +func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + if temporal.GetConfig().Activities == nil { + // no need to serve activities + return errors.E(errors.Disabled) + } + + p.temporal = temporal + p.server = server + p.events = events.NewEventsHandler() + p.log = log + p.reset = make(chan struct{}) + + return nil +} + +// Serve activities with underlying workers. +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + + pool, err := p.startPool() + if err != nil { + errCh <- errors.E("startPool", err) + return errCh + } + + p.pool = pool + + go func() { + for { + select { + case <-p.reset: + if atomic.LoadInt64(&p.closing) == 1 { + return + } + + err := p.replacePool() + if err == nil { + continue + } + + bkoff := backoff.NewExponentialBackOff() + bkoff.InitialInterval = time.Second + + err = backoff.Retry(p.replacePool, bkoff) + if err != nil { + errCh <- errors.E("deadPool", err) + } + } + } + }() + + return errCh +} + +// Stop stops the serving plugin. +func (p *Plugin) Stop() error { + atomic.StoreInt64(&p.closing, 1) + + pool := p.getPool() + if pool != nil { + p.pool = nil + return pool.Destroy(context.Background()) + } + + return nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// RPC returns associated rpc service. +func (p *Plugin) RPC() interface{} { + client, _ := p.temporal.GetClient() + + return &rpc{srv: p, client: client} +} + +// Workers returns pool workers. +func (p *Plugin) Workers() []worker.SyncWorker { + return p.getPool().Workers() +} + +// ActivityNames returns list of all available activities. +func (p *Plugin) ActivityNames() []string { + return p.pool.ActivityNames() +} + +// Reset resets underlying workflow pool with new copy. +func (p *Plugin) Reset() error { + p.reset <- struct{}{} + + return nil +} + +// AddListener adds event listeners to the service. +func (p *Plugin) AddListener(listener events.Listener) { + p.events.AddListener(listener) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) poolListener(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventPoolError { + p.log.Error("Activity pool error", "error", ev.Payload.(error)) + p.reset <- struct{}{} + } + } + + p.events.Push(event) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) startPool() (activityPool, error) { + pool, err := newActivityPool( + p.temporal.GetCodec().WithLogger(p.log), + p.poolListener, + *p.temporal.GetConfig().Activities, + p.server, + ) + + if err != nil { + return nil, errors.E(errors.Op("newActivityPool"), err) + } + + err = pool.Start(context.Background(), p.temporal) + if err != nil { + return nil, errors.E(errors.Op("startActivityPool"), err) + } + + p.log.Debug("Started activity processing", "activities", pool.ActivityNames()) + + return pool, nil +} + +func (p *Plugin) replacePool() error { + pool, err := p.startPool() + if err != nil { + p.log.Error("Replace activity pool failed", "error", err) + return errors.E(errors.Op("newActivityPool"), err) + } + + p.log.Debug("Replace activity pool") + + var previous activityPool + + p.mu.Lock() + previous, p.pool = p.pool, pool + p.mu.Unlock() + + errD := previous.Destroy(context.Background()) + if errD != nil { + p.log.Error( + "Unable to destroy expired activity pool", + "error", + errors.E(errors.Op("destroyActivityPool"), errD), + ) + } + + return nil +} + +// getPool returns currently pool. +func (p *Plugin) getPool() activityPool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.pool +} diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go new file mode 100644 index 00000000..49efcd4f --- /dev/null +++ b/plugins/temporal/activity/rpc.go @@ -0,0 +1,66 @@ +package activity + +import ( + v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "google.golang.org/protobuf/proto" +) + +/* +- the method's type is exported. +- the method is exported. +- the method has two arguments, both exported (or builtin) types. +- the method's second argument is a pointer. +- the method has return type error. +*/ +type rpc struct { + srv *Plugin + client client.Client +} + +// RecordHeartbeatRequest sent by activity to record current state. +type RecordHeartbeatRequest struct { + TaskToken []byte `json:"taskToken"` + Details []byte `json:"details"` +} + +// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled. +type RecordHeartbeatResponse struct { + Canceled bool `json:"canceled"` +} + +// RecordActivityHeartbeat records heartbeat for an activity. +// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. +// details - is the progress you want to record along with heart beat for this activity. +// The errors it can return: +// - EntityNotExistsError +// - InternalServiceError +// - CanceledError +func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error { + details := &commonpb.Payloads{} + + if len(in.Details) != 0 { + if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil { + return err + } + } + + // find running activity + ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken) + if err != nil { + return err + } + + activity.RecordHeartbeat(ctx, details) + + select { + case <-ctx.Done(): + *out = RecordHeartbeatResponse{Canceled: true} + default: + *out = RecordHeartbeatResponse{Canceled: false} + } + + return nil +} diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go new file mode 100644 index 00000000..10257070 --- /dev/null +++ b/plugins/temporal/client/doc/doc.go @@ -0,0 +1 @@ +package doc diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio new file mode 100644 index 00000000..f2350af8 --- /dev/null +++ b/plugins/temporal/client/doc/temporal.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go new file mode 100644 index 00000000..37c8d7be --- /dev/null +++ b/plugins/temporal/client/plugin.go @@ -0,0 +1,155 @@ +package client + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// PluginName defines public service name. +const PluginName = "temporal" + +// indicates that the case size was set +var stickyCacheSet = false + +// Plugin implement Temporal contract. +type Plugin struct { + workerID int32 + cfg Config + dc converter.DataConverter + log logger.Logger + client client.Client +} + +// Temporal define common interface for RoadRunner plugins. +type Temporal interface { + GetClient() (client.Client, error) + GetDataConverter() converter.DataConverter + GetConfig() Config + GetCodec() rrt.Codec + CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error) +} + +// Config of the temporal client and depended services. +type Config struct { + Address string + Namespace string + Activities *pool.Config + Codec string + DebugLevel int `mapstructure:"debug_level"` + CacheSize int `mapstructure:"cache_size"` +} + +// Init initiates temporal client plugin. +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + p.log = log + p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) + + return cfg.UnmarshalKey(PluginName, &p.cfg) +} + +// GetConfig returns temporal configuration. +func (p *Plugin) GetConfig() Config { + return p.cfg +} + +// GetCodec returns communication codec. +func (p *Plugin) GetCodec() rrt.Codec { + if p.cfg.Codec == "json" { + return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log) + } + + // production ready protocol, no debug abilities + return rrt.NewProtoCodec() +} + +// GetDataConverter returns data active data converter. +func (p *Plugin) GetDataConverter() converter.DataConverter { + return p.dc +} + +// Serve starts temporal srv. +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + var err error + + if stickyCacheSet == false && p.cfg.CacheSize != 0 { + worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize) + stickyCacheSet = true + } + + p.client, err = client.NewClient(client.Options{ + Logger: p.log, + HostPort: p.cfg.Address, + Namespace: p.cfg.Namespace, + DataConverter: p.dc, + }) + + p.log.Debug("Connected to temporal server", "Plugin", p.cfg.Address) + + if err != nil { + errCh <- errors.E(errors.Op("p connect"), err) + } + + return errCh +} + +// Stop stops temporal srv connection. +func (p *Plugin) Stop() error { + if p.client != nil { + p.client.Close() + } + + return nil +} + +// GetClient returns active srv connection. +func (p *Plugin) GetClient() (client.Client, error) { + return p.client, nil +} + +// CreateWorker allocates new temporal worker on an active connection. +func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { + if p.client == nil { + return nil, errors.E("unable to create worker, invalid temporal p") + } + + if options.Identity == "" { + if tq == "" { + tq = client.DefaultNamespace + } + + // ensures unique worker IDs + options.Identity = fmt.Sprintf( + "%d@%s@%s@%v", + os.Getpid(), + getHostName(), + tq, + atomic.AddInt32(&p.workerID, 1), + ) + } + + return worker.New(p.client, tq, options), nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +func getHostName() string { + hostName, err := os.Hostname() + if err != nil { + hostName = "Unknown" + } + return hostName +} diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go new file mode 100644 index 00000000..406e70f4 --- /dev/null +++ b/plugins/temporal/protocol/converter.go @@ -0,0 +1,76 @@ +package protocol + +import ( + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +type ( + // DataConverter wraps Temporal data converter to enable direct access to the payloads. + DataConverter struct { + fallback converter.DataConverter + } +) + +// NewDataConverter creates new data converter. +func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { + return &DataConverter{fallback: fallback} +} + +// ToPayloads converts a list of values. +func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { + for _, v := range values { + if aggregated, ok := v.(*commonpb.Payloads); ok { + // bypassing + return aggregated, nil + } + } + + return r.fallback.ToPayloads(values...) +} + +// ToPayload converts single value to payload. +func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + return r.fallback.ToPayload(value) +} + +// FromPayloads converts to a list of values of different types. +// Useful for deserializing arguments of function invocations. +func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + if payloads == nil { + return nil + } + + if len(valuePtrs) == 1 { + // input proxying + if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { + *input = &commonpb.Payloads{} + (*input).Payloads = payloads.Payloads + return nil + } + } + + for i := 0; i < len(payloads.Payloads); i++ { + err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) + if err != nil { + return err + } + } + + return nil +} + +// FromPayload converts single value from payload. +func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + return r.fallback.FromPayload(payload, valuePtr) +} + +// ToString converts payload object into human readable string. +func (r *DataConverter) ToString(input *commonpb.Payload) string { + return r.fallback.ToString(input) +} + +// ToStrings converts payloads object into human readable strings. +func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { + return r.fallback.ToStrings(input) +} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go new file mode 100644 index 00000000..6ce9fa0f --- /dev/null +++ b/plugins/temporal/protocol/converter_test.go @@ -0,0 +1,23 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +func Test_Passthough(t *testing.T) { + codec := NewDataConverter(converter.GetDefaultDataConverter()) + + value, err := codec.ToPayloads("test") + assert.NoError(t, err) + + out := &common.Payloads{} + + assert.Len(t, out.Payloads, 0) + assert.NoError(t, codec.FromPayloads(value, &out)) + + assert.Len(t, out.Payloads, 1) +} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go new file mode 100644 index 00000000..c554e28f --- /dev/null +++ b/plugins/temporal/protocol/internal/protocol.pb.go @@ -0,0 +1,167 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protocol.proto + +package internal + +import ( + fmt "fmt" + math "math" + + proto "github.com/golang/protobuf/proto" + v11 "go.temporal.io/api/common/v1" + v1 "go.temporal.io/api/failure/v1" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Frame struct { + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Frame) Reset() { *m = Frame{} } +func (m *Frame) String() string { return proto.CompactTextString(m) } +func (*Frame) ProtoMessage() {} +func (*Frame) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{0} +} + +func (m *Frame) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Frame.Unmarshal(m, b) +} +func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Frame.Marshal(b, m, deterministic) +} +func (m *Frame) XXX_Merge(src proto.Message) { + xxx_messageInfo_Frame.Merge(m, src) +} +func (m *Frame) XXX_Size() int { + return xxx_messageInfo_Frame.Size(m) +} +func (m *Frame) XXX_DiscardUnknown() { + xxx_messageInfo_Frame.DiscardUnknown(m) +} + +var xxx_messageInfo_Frame proto.InternalMessageInfo + +func (m *Frame) GetMessages() []*Message { + if m != nil { + return m.Messages + } + return nil +} + +// Single communication message. +type Message struct { + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + // command name (if any) + Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + // command options in json format. + Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + // error response. + Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` + // invocation or result payloads. + Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{1} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetId() uint64 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Message) GetCommand() string { + if m != nil { + return m.Command + } + return "" +} + +func (m *Message) GetOptions() []byte { + if m != nil { + return m.Options + } + return nil +} + +func (m *Message) GetFailure() *v1.Failure { + if m != nil { + return m.Failure + } + return nil +} + +func (m *Message) GetPayloads() *v11.Payloads { + if m != nil { + return m.Payloads + } + return nil +} + +func init() { + proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") + proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") +} + +func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } + +var fileDescriptor_2bc2336598a3f7e0 = []byte{ + // 257 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, + 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, + 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, + 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, + 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, + 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, + 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, + 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, + 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, + 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, + 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, + 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, + 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, + 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, + 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, + 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, + 0x00, +} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go new file mode 100644 index 00000000..dae3a7d0 --- /dev/null +++ b/plugins/temporal/protocol/json_codec.go @@ -0,0 +1,226 @@ +package protocol + +import ( + "github.com/fatih/color" + jsoniter "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +type ( + // JSONCodec can be used for debugging and log capturing reasons. + JSONCodec struct { + // level enables verbose logging or all incoming and outcoming messages. + level DebugLevel + + // logger renders messages when debug enabled. + logger logger.Logger + } + + // jsonFrame contains message command in binary form. + jsonFrame struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command name. Optional. + Command string `json:"command,omitempty"` + + // Options to be unmarshalled to body (raw payload). + Options jsoniter.RawMessage `json:"options,omitempty"` + + // Failure associated with command id. + Failure []byte `json:"failure,omitempty"` + + // Payloads specific to the command or result. + Payloads []byte `json:"payloads,omitempty"` + } +) + +// NewJSONCodec creates new Json communication codec. +func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { + return &JSONCodec{ + level: level, + logger: logger, + } +} + +// WithLogger creates new codes instance with attached logger. +func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { + return &JSONCodec{ + level: c.level, + logger: logger, + } +} + +// GetName returns codec name. +func (c *JSONCodec) GetName() string { + return "json" +} + +// Execute exchanges commands with worker. +func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + if len(msg) == 0 { + return nil, nil + } + + var ( + response = make([]jsonFrame, 0, 5) + result = make([]Message, 0, 5) + err error + ) + + frames := make([]jsonFrame, 0, len(msg)) + for _, m := range msg { + frame, err := c.packFrame(m) + if err != nil { + return nil, err + } + + frames = append(frames, frame) + } + + p := payload.Payload{} + + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = jsoniter.Marshal(ctx) + if err != nil { + return nil, errors.E(errors.Op("encodeContext"), err) + } + + p.Body, err = jsoniter.Marshal(frames) + if err != nil { + return nil, errors.E(errors.Op("encodePayload"), err) + } + + if c.level >= DebugNormal { + logMessage := string(p.Body) + " " + string(p.Context) + if c.level >= DebugHumanized { + logMessage = color.GreenString(logMessage) + } + + c.logger.Debug(logMessage) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(errors.Op("execute"), err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + if c.level >= DebugNormal { + logMessage := string(out.Body) + if c.level >= DebugHumanized { + logMessage = color.HiYellowString(logMessage) + } + + c.logger.Debug(logMessage, "receive", true) + } + + err = jsoniter.Unmarshal(out.Body, &response) + if err != nil { + return nil, errors.E(errors.Op("parseResponse"), err) + } + + for _, f := range response { + msg, err := c.parseFrame(f) + if err != nil { + return nil, err + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { + var ( + err error + frame jsonFrame + ) + + frame.ID = msg.ID + + if msg.Payloads != nil { + frame.Payloads, err = msg.Payloads.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Failure != nil { + frame.Failure, err = msg.Failure.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Command == nil { + return frame, nil + } + + frame.Command, err = commandName(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + frame.Options, err = jsoniter.Marshal(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + return frame, nil +} + +func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { + var ( + err error + msg Message + ) + + msg.ID = frame.ID + + if frame.Payloads != nil { + msg.Payloads = &common.Payloads{} + + err = msg.Payloads.Unmarshal(frame.Payloads) + if err != nil { + return Message{}, err + } + } + + if frame.Failure != nil { + msg.Failure = &failure.Failure{} + + err = msg.Failure.Unmarshal(frame.Failure) + if err != nil { + return Message{}, err + } + } + + if frame.Command != "" { + cmd, err := initCommand(frame.Command) + if err != nil { + return Message{}, err + } + + err = jsoniter.Unmarshal(frame.Options, &cmd) + if err != nil { + return Message{}, err + } + + msg.Command = cmd + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go new file mode 100644 index 00000000..4568fd1d --- /dev/null +++ b/plugins/temporal/protocol/message.go @@ -0,0 +1,333 @@ +package protocol + +import ( + "time" + + "github.com/spiral/errors" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +const ( + getWorkerInfoCommand = "GetWorkerInfo" + + invokeActivityCommand = "InvokeActivity" + startWorkflowCommand = "StartWorkflow" + invokeSignalCommand = "InvokeSignal" + invokeQueryCommand = "InvokeQuery" + destroyWorkflowCommand = "DestroyWorkflow" + cancelWorkflowCommand = "CancelWorkflow" + getStackTraceCommand = "StackTrace" + + executeActivityCommand = "ExecuteActivity" + executeChildWorkflowCommand = "ExecuteChildWorkflow" + getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" + + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + continueAsNewCommand = "ContinueAsNew" + + signalExternalWorkflowCommand = "SignalExternalWorkflow" + cancelExternalWorkflowCommand = "CancelExternalWorkflow" + + cancelCommand = "Cancel" + panicCommand = "Panic" +) + +// GetWorkerInfo reads worker information. +type GetWorkerInfo struct { +} + +// InvokeActivity invokes activity. +type InvokeActivity struct { + // Name defines activity name. + Name string `json:"name"` + + // Info contains execution context. + Info activity.Info `json:"info"` + + // HeartbeatDetails indicates that the payload also contains last heartbeat details. + HeartbeatDetails int `json:"heartbeatDetails,omitempty"` +} + +// StartWorkflow sends worker command to start workflow. +type StartWorkflow struct { + // Info to define workflow context. + Info *workflow.Info `json:"info"` + + // LastCompletion contains offset of last completion results. + LastCompletion int `json:"lastCompletion,omitempty"` +} + +// InvokeSignal invokes signal with a set of arguments. +type InvokeSignal struct { + // RunID workflow run id. + RunID string `json:"runId"` + + // Name of the signal. + Name string `json:"name"` +} + +// InvokeQuery invokes query with a set of arguments. +type InvokeQuery struct { + // RunID workflow run id. + RunID string `json:"runId"` + // Name of the query. + Name string `json:"name"` +} + +// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). +type CancelWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// DestroyWorkflow asks worker to offload workflow from memory. +type DestroyWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// GetStackTrace asks worker to offload workflow from memory. +type GetStackTrace struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// ExecuteActivity command by workflow worker. +type ExecuteActivity struct { + // Name defines activity name. + Name string `json:"name"` + // Options to run activity. + Options bindings.ExecuteActivityOptions `json:"options,omitempty"` +} + +// ExecuteChildWorkflow executes child workflow. +type ExecuteChildWorkflow struct { + // Name defines workflow name. + Name string `json:"name"` + // Options to run activity. + Options bindings.WorkflowOptions `json:"options,omitempty"` +} + +// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. +type GetChildWorkflowExecution struct { + // ID of child workflow command. + ID uint64 `json:"id"` +} + +// NewTimer starts new timer. +type NewTimer struct { + // Milliseconds defines timer duration. + Milliseconds int `json:"ms"` +} + +// SideEffect to be recorded into the history. +type SideEffect struct{} + +// GetVersion requests version marker. +type GetVersion struct { + ChangeID string `json:"changeID"` + MinSupported int `json:"minSupported"` + MaxSupported int `json:"maxSupported"` +} + +// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. +type CompleteWorkflow struct{} + +// ContinueAsNew restarts workflow with new running instance. +type ContinueAsNew struct { + // Result defines workflow execution result. + Name string `json:"name"` + + // Options for continued as new workflow. + Options struct { + TaskQueueName string + WorkflowExecutionTimeout time.Duration + WorkflowRunTimeout time.Duration + WorkflowTaskTimeout time.Duration + } `json:"options"` +} + +// SignalExternalWorkflow sends signal to external workflow. +type SignalExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` + Signal string `json:"signal"` + ChildWorkflowOnly bool `json:"childWorkflowOnly"` +} + +// CancelExternalWorkflow canceller external workflow. +type CancelExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` +} + +// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). +type Cancel struct { + // CommandIDs to be cancelled. + CommandIDs []uint64 `json:"ids"` +} + +// Panic triggers panic in workflow process. +type Panic struct { + // Message to include into the error. + Message string `json:"message"` +} + +// ActivityParams maps activity command to activity params. +func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { + params := bindings.ExecuteActivityParams{ + ExecuteActivityOptions: cmd.Options, + ActivityType: bindings.ActivityType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// WorkflowParams maps workflow command to workflow params. +func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { + params := bindings.ExecuteWorkflowParams{ + WorkflowOptions: cmd.Options, + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// ToDuration converts timer command to time.Duration. +func (cmd NewTimer) ToDuration() time.Duration { + return time.Millisecond * time.Duration(cmd.Milliseconds) +} + +// returns command name (only for the commands sent to the worker) +func commandName(cmd interface{}) (string, error) { + switch cmd.(type) { + case GetWorkerInfo, *GetWorkerInfo: + return getWorkerInfoCommand, nil + case StartWorkflow, *StartWorkflow: + return startWorkflowCommand, nil + case InvokeSignal, *InvokeSignal: + return invokeSignalCommand, nil + case InvokeQuery, *InvokeQuery: + return invokeQueryCommand, nil + case DestroyWorkflow, *DestroyWorkflow: + return destroyWorkflowCommand, nil + case CancelWorkflow, *CancelWorkflow: + return cancelWorkflowCommand, nil + case GetStackTrace, *GetStackTrace: + return getStackTraceCommand, nil + case InvokeActivity, *InvokeActivity: + return invokeActivityCommand, nil + case ExecuteActivity, *ExecuteActivity: + return executeActivityCommand, nil + case ExecuteChildWorkflow, *ExecuteChildWorkflow: + return executeChildWorkflowCommand, nil + case GetChildWorkflowExecution, *GetChildWorkflowExecution: + return getChildWorkflowExecutionCommand, nil + case NewTimer, *NewTimer: + return newTimerCommand, nil + case GetVersion, *GetVersion: + return getVersionCommand, nil + case SideEffect, *SideEffect: + return sideEffectCommand, nil + case CompleteWorkflow, *CompleteWorkflow: + return completeWorkflowCommand, nil + case ContinueAsNew, *ContinueAsNew: + return continueAsNewCommand, nil + case SignalExternalWorkflow, *SignalExternalWorkflow: + return signalExternalWorkflowCommand, nil + case CancelExternalWorkflow, *CancelExternalWorkflow: + return cancelExternalWorkflowCommand, nil + case Cancel, *Cancel: + return cancelCommand, nil + case Panic, *Panic: + return panicCommand, nil + default: + return "", errors.E(errors.Op("commandName"), "undefined command type", cmd) + } +} + +// reads command from binary payload +func initCommand(name string) (interface{}, error) { + switch name { + case getWorkerInfoCommand: + return &GetWorkerInfo{}, nil + + case startWorkflowCommand: + return &StartWorkflow{}, nil + + case invokeSignalCommand: + return &InvokeSignal{}, nil + + case invokeQueryCommand: + return &InvokeQuery{}, nil + + case destroyWorkflowCommand: + return &DestroyWorkflow{}, nil + + case cancelWorkflowCommand: + return &CancelWorkflow{}, nil + + case getStackTraceCommand: + return &GetStackTrace{}, nil + + case invokeActivityCommand: + return &InvokeActivity{}, nil + + case executeActivityCommand: + return &ExecuteActivity{}, nil + + case executeChildWorkflowCommand: + return &ExecuteChildWorkflow{}, nil + + case getChildWorkflowExecutionCommand: + return &GetChildWorkflowExecution{}, nil + + case newTimerCommand: + return &NewTimer{}, nil + + case getVersionCommand: + return &GetVersion{}, nil + + case sideEffectCommand: + return &SideEffect{}, nil + + case completeWorkflowCommand: + return &CompleteWorkflow{}, nil + + case continueAsNewCommand: + return &ContinueAsNew{}, nil + + case signalExternalWorkflowCommand: + return &SignalExternalWorkflow{}, nil + + case cancelExternalWorkflowCommand: + return &CancelExternalWorkflow{}, nil + + case cancelCommand: + return &Cancel{}, nil + + case panicCommand: + return &Panic{}, nil + + default: + return nil, errors.E(errors.Op("initCommand"), "undefined command type", name) + } +} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go new file mode 100644 index 00000000..b41f02b6 --- /dev/null +++ b/plugins/temporal/protocol/proto_codec.go @@ -0,0 +1,144 @@ +package protocol + +import ( + v1 "github.com/golang/protobuf/proto" //nolint:staticcheck + jsoniter "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" + "google.golang.org/protobuf/proto" +) + +type ( + // ProtoCodec uses protobuf to exchange messages with underlying workers. + ProtoCodec struct { + } +) + +// NewProtoCodec creates new Proto communication codec. +func NewProtoCodec() Codec { + return &ProtoCodec{} +} + +// WithLogger creates new codes instance with attached logger. +func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { + return &ProtoCodec{} +} + +// GetName returns codec name. +func (c *ProtoCodec) GetName() string { + return "protobuf" +} + +// Execute exchanges commands with worker. +func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + if len(msg) == 0 { + return nil, nil + } + + var request = &internal.Frame{} + var response = &internal.Frame{} + var result = make([]Message, 0, 5) + var err error + + for _, m := range msg { + frame, err := c.packMessage(m) + if err != nil { + return nil, err + } + + request.Messages = append(request.Messages, frame) + } + + p := payload.Payload{} + + // context is always in json format + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = jsoniter.Marshal(ctx) + if err != nil { + return nil, errors.E(errors.Op("encodeContext"), err) + } + + p.Body, err = proto.Marshal(v1.MessageV2(request)) + if err != nil { + return nil, errors.E(errors.Op("encodePayload"), err) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(errors.Op("execute"), err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + err = proto.Unmarshal(out.Body, v1.MessageV2(response)) + if err != nil { + return nil, errors.E(errors.Op("parseResponse"), err) + } + + for _, f := range response.Messages { + msg, err := c.parseMessage(f) + if err != nil { + return nil, err + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { + var err error + + frame := &internal.Message{ + Id: msg.ID, + Payloads: msg.Payloads, + Failure: msg.Failure, + } + + if msg.Command != nil { + frame.Command, err = commandName(msg.Command) + if err != nil { + return nil, err + } + + frame.Options, err = jsoniter.Marshal(msg.Command) + if err != nil { + return nil, err + } + } + + return frame, nil +} + +func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { + var err error + + msg := Message{ + ID: frame.Id, + Payloads: frame.Payloads, + Failure: frame.Failure, + } + + if frame.Command != "" { + msg.Command, err = initCommand(frame.Command) + if err != nil { + return Message{}, err + } + + err = jsoniter.Unmarshal(frame.Options, &msg.Command) + if err != nil { + return Message{}, err + } + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go new file mode 100644 index 00000000..53076fdf --- /dev/null +++ b/plugins/temporal/protocol/protocol.go @@ -0,0 +1,77 @@ +package protocol + +import ( + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +const ( + // DebugNone disables all debug messages. + DebugNone = iota + + // DebugNormal renders all messages into console. + DebugNormal + + // DebugHumanized enables color highlights for messages. + DebugHumanized +) + +// Context provides worker information about currently. Context can be empty for server level commands. +type Context struct { + // TaskQueue associates message batch with the specific task queue in underlying worker. + TaskQueue string `json:"taskQueue,omitempty"` + + // TickTime associated current or historical time with message batch. + TickTime string `json:"tickTime,omitempty"` + + // Replay indicates that current message batch is historical. + Replay bool `json:"replay,omitempty"` +} + +// Message used to exchange the send commands and receive responses from underlying workers. +type Message struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command of the message in unmarshalled form. Pointer. + Command interface{} `json:"command,omitempty"` + + // Failure associated with command id. + Failure *failure.Failure `json:"failure,omitempty"` + + // Payloads contains message specific payloads in binary format. + Payloads *commonpb.Payloads `json:"payloads,omitempty"` +} + +// Codec manages payload encoding and decoding while communication with underlying worker. +type Codec interface { + // WithLogger creates new codes instance with attached logger. + WithLogger(logger.Logger) Codec + + // GetName returns codec name. + GetName() string + + // Execute sends message to worker and waits for the response. + Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) +} + +// Endpoint provides the ability to send and receive messages. +type Endpoint interface { + // ExecWithContext allow to set ExecTTL + Exec(p payload.Payload) (payload.Payload, error) +} + +// DebugLevel configures debug level. +type DebugLevel int + +// IsEmpty only check if task queue set. +func (ctx Context) IsEmpty() bool { + return ctx.TaskQueue == "" +} + +// IsCommand returns true if message carries request. +func (msg Message) IsCommand() bool { + return msg.Command != nil +} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go new file mode 100644 index 00000000..6dfcd81f --- /dev/null +++ b/plugins/temporal/protocol/worker_info.go @@ -0,0 +1,72 @@ +package protocol + +import ( + "github.com/spiral/errors" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// WorkerInfo outlines information about every available worker and it's TaskQueues. + +// WorkerInfo lists available task queues, workflows and activities. +type WorkerInfo struct { + // TaskQueue assigned to the worker. + TaskQueue string `json:"taskQueue"` + + // Options describe worker options. + Options worker.Options `json:"options,omitempty"` + + // Workflows provided by the worker. + Workflows []WorkflowInfo + + // Activities provided by the worker. + Activities []ActivityInfo +} + +// WorkflowInfo describes single worker workflow. +type WorkflowInfo struct { + // Name of the workflow. + Name string `json:"name"` + + // Queries pre-defined for the workflow type. + Queries []string `json:"queries"` + + // Signals pre-defined for the workflow type. + Signals []string `json:"signals"` +} + +// ActivityInfo describes single worker activity. +type ActivityInfo struct { + // Name describes public activity name. + Name string `json:"name"` +} + +// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). +func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { + const op = errors.Op("fetch_worker_info") + + result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) + if err != nil { + return nil, err + } + + if len(result) != 1 { + return nil, errors.E(op, errors.Str("unable to read worker info")) + } + + if result[0].ID != 0 { + return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) + } + + var info []WorkerInfo + for i := range result[0].Payloads.Payloads { + wi := WorkerInfo{} + if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { + return nil, errors.E(op, err) + } + + info = append(info, wi) + } + + return info, nil +} diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go new file mode 100644 index 00000000..c38f447f --- /dev/null +++ b/plugins/temporal/workflow/canceller.go @@ -0,0 +1,38 @@ +package workflow + +import ( + "sync" +) + +type ( + cancellable func() error + + canceller struct { + ids sync.Map + } +) + +func (c *canceller) register(id uint64, cancel cancellable) { + c.ids.Store(id, cancel) +} + +func (c *canceller) discard(id uint64) { + c.ids.Delete(id) +} + +func (c *canceller) cancel(ids ...uint64) error { + var err error + for _, id := range ids { + cancel, ok := c.ids.LoadAndDelete(id) + if ok == false { + continue + } + + err = cancel.(cancellable)() + if err != nil { + return err + } + } + + return nil +} diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go new file mode 100644 index 00000000..d6e846f8 --- /dev/null +++ b/plugins/temporal/workflow/canceller_test.go @@ -0,0 +1,33 @@ +package workflow + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_CancellerNoListeners(t *testing.T) { + c := &canceller{} + + assert.NoError(t, c.cancel(1)) +} + +func Test_CancellerListenerError(t *testing.T) { + c := &canceller{} + c.register(1, func() error { + return errors.New("failed") + }) + + assert.Error(t, c.cancel(1)) +} + +func Test_CancellerListenerDiscarded(t *testing.T) { + c := &canceller{} + c.register(1, func() error { + return errors.New("failed") + }) + + c.discard(1) + assert.NoError(t, c.cancel(1)) +} diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go new file mode 100644 index 00000000..ac75cbda --- /dev/null +++ b/plugins/temporal/workflow/id_registry.go @@ -0,0 +1,51 @@ +package workflow + +import ( + "sync" + + bindings "go.temporal.io/sdk/internalbindings" +) + +// used to gain access to child workflow ids after they become available via callback result. +type idRegistry struct { + mu sync.Mutex + ids map[uint64]entry + listeners map[uint64]listener +} + +type listener func(w bindings.WorkflowExecution, err error) + +type entry struct { + w bindings.WorkflowExecution + err error +} + +func newIDRegistry() *idRegistry { + return &idRegistry{ + ids: map[uint64]entry{}, + listeners: map[uint64]listener{}, + } +} + +func (c *idRegistry) listen(id uint64, cl listener) { + c.mu.Lock() + defer c.mu.Unlock() + + c.listeners[id] = cl + + if e, ok := c.ids[id]; ok { + cl(e.w, e.err) + } +} + +func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + e := entry{w: w, err: err} + c.ids[id] = e + + if l, ok := c.listeners[id]; ok { + l(e.w, e.err) + } +} diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go new file mode 100644 index 00000000..50949897 --- /dev/null +++ b/plugins/temporal/workflow/message_queue.go @@ -0,0 +1,55 @@ +package workflow + +import ( + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +type messageQueue struct { + seqID func() uint64 + queue []rrt.Message +} + +func newMessageQueue(sedID func() uint64) *messageQueue { + return &messageQueue{ + seqID: sedID, + queue: make([]rrt.Message, 0, 5), + } +} + +func (mq *messageQueue) flush() { + mq.queue = mq.queue[0:0] +} + +func (mq *messageQueue) allocateMessage( + cmd interface{}, + payloads *common.Payloads, +) (id uint64, msg rrt.Message, err error) { + msg = rrt.Message{ + ID: mq.seqID(), + Command: cmd, + Payloads: payloads, + } + + return msg.ID, msg, nil +} + +func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) (id uint64, err error) { + id, msg, err := mq.allocateMessage(cmd, payloads) + if err != nil { + return 0, err + } + + mq.queue = append(mq.queue, msg) + + return id, nil +} + +func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads}) +} + +func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure}) +} diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go new file mode 100644 index 00000000..61f5123f --- /dev/null +++ b/plugins/temporal/workflow/message_queue_test.go @@ -0,0 +1,55 @@ +package workflow + +import ( + "sync/atomic" + "testing" + + "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "github.com/stretchr/testify/assert" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +func Test_MessageQueueFlushError(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + mq.pushError(1, &failure.Failure{}) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) + assert.Equal(t, uint64(0), index) +} + +func Test_MessageQueueFlushResponse(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + mq.pushResponse(1, &common.Payloads{}) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) + assert.Equal(t, uint64(0), index) +} + +func Test_MessageQueueCommandID(t *testing.T) { + var index uint64 + mq := newMessageQueue(func() uint64 { + return atomic.AddUint64(&index, 1) + }) + + n, err := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) + assert.Equal(t, n, index) + + assert.NoError(t, err) + assert.Len(t, mq.queue, 1) + + mq.flush() + assert.Len(t, mq.queue, 0) +} diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go new file mode 100644 index 00000000..ee2d722c --- /dev/null +++ b/plugins/temporal/workflow/plugin.go @@ -0,0 +1,198 @@ +package workflow + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" +) + +const ( + // PluginName defines public service name. + PluginName = "workflows" + + // RRMode sets as RR_MODE env variable to let worker know about the mode to run. + RRMode = "temporal/workflow" +) + +// Plugin manages workflows and workers. +type Plugin struct { + temporal client.Temporal + events events.Handler + server server.Server + log logger.Logger + mu sync.Mutex + reset chan struct{} + pool workflowPool + closing int64 +} + +// Init workflow plugin. +func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + p.temporal = temporal + p.server = server + p.events = events.NewEventsHandler() + p.log = log + p.reset = make(chan struct{}) + + return nil +} + +// Serve starts workflow service. +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + + pool, err := p.startPool() + if err != nil { + errCh <- errors.E("startPool", err) + return errCh + } + + p.pool = pool + + go func() { + for { + select { + case <-p.reset: + if atomic.LoadInt64(&p.closing) == 1 { + return + } + + err := p.replacePool() + if err == nil { + continue + } + + bkoff := backoff.NewExponentialBackOff() + bkoff.InitialInterval = time.Second + + err = backoff.Retry(p.replacePool, bkoff) + if err != nil { + errCh <- errors.E("deadPool", err) + } + } + } + }() + + return errCh +} + +// Stop workflow service. +func (p *Plugin) Stop() error { + atomic.StoreInt64(&p.closing, 1) + + pool := p.getPool() + if pool != nil { + p.pool = nil + return pool.Destroy(context.Background()) + } + + return nil +} + +// Name of the service. +func (p *Plugin) Name() string { + return PluginName +} + +// Workers returns list of available workflow workers. +func (p *Plugin) Workers() []worker.BaseProcess { + return p.pool.Workers() +} + +// WorkflowNames returns list of all available workflows. +func (p *Plugin) WorkflowNames() []string { + return p.pool.WorkflowNames() +} + +// Reset resets underlying workflow pool with new copy. +func (p *Plugin) Reset() error { + p.reset <- struct{}{} + + return nil +} + +// AddListener adds event listeners to the service. +func (p *Plugin) AddListener(listener events.Listener) { + p.events.AddListener(listener) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) poolListener(event interface{}) { + if ev, ok := event.(PoolEvent); ok { + if ev.Event == eventWorkerExit { + if ev.Caused != nil { + p.log.Error("Workflow pool error", "error", ev.Caused) + } + p.reset <- struct{}{} + } + } + + p.events.Push(event) +} + +// AddListener adds event listeners to the service. +func (p *Plugin) startPool() (workflowPool, error) { + pool, err := newWorkflowPool( + p.temporal.GetCodec().WithLogger(p.log), + p.poolListener, + p.server, + ) + if err != nil { + return nil, errors.E(errors.Op("initWorkflowPool"), err) + } + + err = pool.Start(context.Background(), p.temporal) + if err != nil { + return nil, errors.E(errors.Op("startWorkflowPool"), err) + } + + p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) + + return pool, nil +} + +func (p *Plugin) replacePool() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.pool != nil { + errD := p.pool.Destroy(context.Background()) + p.pool = nil + if errD != nil { + p.log.Error( + "Unable to destroy expired workflow pool", + "error", + errors.E(errors.Op("destroyWorkflowPool"), errD), + ) + } + } + + pool, err := p.startPool() + if err != nil { + p.log.Error("Replace workflow pool failed", "error", err) + return errors.E(errors.Op("newWorkflowPool"), err) + } + + p.log.Debug("Replace workflow pool") + + p.pool = pool + + return nil +} + +// getPool returns currently pool. +func (p *Plugin) getPool() workflowPool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.pool +} diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go new file mode 100644 index 00000000..ec5a14eb --- /dev/null +++ b/plugins/temporal/workflow/process.go @@ -0,0 +1,455 @@ +package workflow + +import ( + "strconv" + "sync/atomic" + "time" + + "github.com/spiral/errors" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + commonpb "go.temporal.io/api/common/v1" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +// wraps single workflow process +type workflowProcess struct { + codec rrt.Codec + pool workflowPool + env bindings.WorkflowEnvironment + header *commonpb.Header + mq *messageQueue + ids *idRegistry + seqID uint64 + runID string + pipeline []rrt.Message + callbacks []func() error + canceller *canceller + inLoop bool +} + +// Execute workflow, bootstraps process. +func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { + wf.env = env + wf.header = header + wf.seqID = 0 + wf.runID = env.WorkflowInfo().WorkflowExecution.RunID + wf.canceller = &canceller{} + + // sequenceID shared for all worker workflows + wf.mq = newMessageQueue(wf.pool.SeqID) + wf.ids = newIDRegistry() + + env.RegisterCancelHandler(wf.handleCancel) + env.RegisterSignalHandler(wf.handleSignal) + env.RegisterQueryHandler(wf.handleQuery) + + var ( + lastCompletion = bindings.GetLastCompletionResult(env) + lastCompletionOffset = 0 + ) + + if lastCompletion != nil && len(lastCompletion.Payloads) != 0 { + if input == nil { + input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}} + } + + input.Payloads = append(input.Payloads, lastCompletion.Payloads...) + lastCompletionOffset = len(lastCompletion.Payloads) + } + + _, err := wf.mq.pushCommand( + rrt.StartWorkflow{ + Info: env.WorkflowInfo(), + LastCompletion: lastCompletionOffset, + }, + input, + ) + + if err != nil { + panic(err) + } +} + +// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. +func (wf *workflowProcess) OnWorkflowTaskStarted() { + wf.inLoop = true + defer func() { wf.inLoop = false }() + + var err error + for _, callback := range wf.callbacks { + err = callback() + if err != nil { + panic(err) + } + } + wf.callbacks = nil + + if err := wf.flushQueue(); err != nil { + panic(err) + } + + for len(wf.pipeline) > 0 { + msg := wf.pipeline[0] + wf.pipeline = wf.pipeline[1:] + + if msg.IsCommand() { + err = wf.handleMessage(msg) + } + + if err != nil { + panic(err) + } + } +} + +// StackTrace renders workflow stack trace. +func (wf *workflowProcess) StackTrace() string { + result, err := wf.runCommand( + rrt.GetStackTrace{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + }, + nil, + ) + + if err != nil { + return err.Error() + } + + var stacktrace string + err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace) + if err != nil { + return err.Error() + } + + return stacktrace +} + +// Close the workflow. +func (wf *workflowProcess) Close() { + // TODO: properly handle errors + // panic(err) + + _, err := wf.mq.pushCommand( + rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) + + if err != nil { + panic(err) + } + + _, _ = wf.discardQueue() +} + +// execution context. +func (wf *workflowProcess) getContext() rrt.Context { + return rrt.Context{ + TaskQueue: wf.env.WorkflowInfo().TaskQueueName, + TickTime: wf.env.Now().Format(time.RFC3339), + Replay: wf.env.IsReplaying(), + } +} + +// schedule cancel command +func (wf *workflowProcess) handleCancel() { + _, err := wf.mq.pushCommand( + rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) + + if err != nil { + panic(err) + } +} + +// schedule the signal processing +func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { + _, err := wf.mq.pushCommand( + rrt.InvokeSignal{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + Name: name, + }, + input, + ) + + if err != nil { + panic(err) + } +} + +// Handle query in blocking mode. +func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { + result, err := wf.runCommand( + rrt.InvokeQuery{ + RunID: wf.runID, + Name: queryType, + }, + queryArgs, + ) + + if err != nil { + return nil, err + } + + if result.Failure != nil { + return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter()) + } + + return result.Payloads, nil +} + +// process incoming command +func (wf *workflowProcess) handleMessage(msg rrt.Message) error { + const op = errors.Op("handleMessage") + var err error + + var ( + id = msg.ID + cmd = msg.Command + payloads = msg.Payloads + ) + + switch cmd := cmd.(type) { + case *rrt.ExecuteActivity: + params := cmd.ActivityParams(wf.env, payloads) + activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelActivity(activityID) + return nil + }) + + case *rrt.ExecuteChildWorkflow: + params := cmd.WorkflowParams(wf.env, payloads) + + // always use deterministic id + if params.WorkflowID == "" { + nextID := atomic.AddUint64(&wf.seqID, 1) + params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) + } + + wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { + wf.ids.push(id, r, e) + }) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) + return nil + }) + + case *rrt.GetChildWorkflowExecution: + wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) { + cl := wf.createCallback(id) + + // TODO rewrite + if err != nil { + panic(err) + } + + p, err := wf.env.GetDataConverter().ToPayloads(w) + if err != nil { + panic(err) + } + + cl(p, err) + }) + + case *rrt.NewTimer: + timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id)) + wf.canceller.register(id, func() error { + if timerID != nil { + wf.env.RequestCancelTimer(*timerID) + } + return nil + }) + + case *rrt.GetVersion: + version := wf.env.GetVersion( + cmd.ChangeID, + workflow.Version(cmd.MinSupported), + workflow.Version(cmd.MaxSupported), + ) + + result, err := wf.env.GetDataConverter().ToPayloads(version) + if err != nil { + return errors.E(op, err) + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.SideEffect: + wf.env.SideEffect( + func() (*commonpb.Payloads, error) { + return payloads, nil + }, + wf.createContinuableCallback(id), + ) + + case *rrt.CompleteWorkflow: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + if msg.Failure == nil { + wf.env.Complete(payloads, nil) + } else { + wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) + } + + case *rrt.ContinueAsNew: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + wf.env.Complete(nil, &workflow.ContinueAsNewError{ + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + Header: wf.header, + TaskQueueName: cmd.Options.TaskQueueName, + WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout, + WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout, + WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout, + }) + + case *rrt.SignalExternalWorkflow: + wf.env.SignalExternalWorkflow( + cmd.Namespace, + cmd.WorkflowID, + cmd.RunID, + cmd.Signal, + payloads, + nil, + cmd.ChildWorkflowOnly, + wf.createCallback(id), + ) + + case *rrt.CancelExternalWorkflow: + wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id)) + + case *rrt.Cancel: + err = wf.canceller.cancel(cmd.CommandIDs...) + if err != nil { + return errors.E(op, err) + } + + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.Panic: + panic(errors.E(cmd.Message)) + + default: + panic("undefined command") + } + + return nil +} + +func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) error { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return nil + } + + // fetch original payload + wf.mq.pushResponse(id, result) + return nil + } + + return func(result *commonpb.Payloads, err error) { + // timer cancel callback can happen inside the loop + if wf.inLoop { + err := callback(result, err) + if err != nil { + panic(err) + } + + return + } + + wf.callbacks = append(wf.callbacks, func() error { + return callback(result, err) + }) + } +} + +// callback to be called inside the queue processing, adds new messages at the end of the queue +func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + } + + return func(result *commonpb.Payloads, err error) { + callback(result, err) + } +} + +// Exchange messages between host and worker processes and add new commands to the queue. +func (wf *workflowProcess) flushQueue() error { + const op = errors.Op("flush queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return errors.E(op, err) + } + + wf.pipeline = append(wf.pipeline, messages...) + + return nil +} + +// Exchange messages between host and worker processes without adding new commands to the queue. +func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { + const op = errors.Op("discard queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return nil, errors.E(op, err) + } + + return messages, nil +} + +// Run single command and return single result. +func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { + const op = errors.Op("run command") + _, msg, err := wf.mq.allocateMessage(cmd, payloads) + if err != nil { + return rrt.Message{}, err + } + + result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) + if err != nil { + return rrt.Message{}, errors.E(op, err) + } + + if len(result) != 1 { + return rrt.Message{}, errors.E("unexpected worker response") + } + + return result[0], nil +} diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go new file mode 100644 index 00000000..2022b624 --- /dev/null +++ b/plugins/temporal/workflow/workflow_pool.go @@ -0,0 +1,186 @@ +package workflow + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/temporal/client" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +const eventWorkerExit = 8390 + +type ( + workflowPool interface { + SeqID() uint64 + Exec(p payload.Payload) (payload.Payload, error) + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.BaseProcess + WorkflowNames() []string + } + + // PoolEvent triggered on workflow pool worker events. + PoolEvent struct { + Event int + Context interface{} + Caused error + } + + // workflowPoolImpl manages workflowProcess executions between worker restarts. + workflowPoolImpl struct { + codec rrt.Codec + seqID uint64 + workflows map[string]rrt.WorkflowInfo + tWorkers []worker.Worker + mu sync.Mutex + worker rrWorker.SyncWorker + active bool + } +) + +// newWorkflowPool creates new workflow pool. +func newWorkflowPool( + codec rrt.Codec, + listener events.Listener, + factory server.Server, +) (workflowPool, error) { + w, err := factory.NewWorker( + context.Background(), + map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, + listener, + ) + + if err != nil { + return nil, errors.E(errors.Op("newWorker"), err) + } + + go func() { + err := w.Wait() + listener(PoolEvent{Event: eventWorkerExit, Caused: err}) + }() + + return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil +} + +// Start the pool in non blocking mode. +func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + pool.mu.Lock() + pool.active = true + pool.mu.Unlock() + + err := pool.initWorkers(ctx, temporal) + if err != nil { + return errors.E(errors.Op("initWorkers"), err) + } + + for i := 0; i < len(pool.tWorkers); i++ { + err := pool.tWorkers[i].Start() + if err != nil { + return errors.E(errors.Op("startTemporalWorker"), err) + } + } + + return nil +} + +// Active. +func (pool *workflowPoolImpl) Active() bool { + return pool.active +} + +// Destroy stops all temporal workers and application worker. +func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.active = false + for i := 0; i < len(pool.tWorkers); i++ { + pool.tWorkers[i].Stop() + } + + worker.PurgeStickyWorkflowCache() + + if err := pool.worker.Stop(); err != nil { + return errors.E(errors.Op("stopWorkflowWorker"), err) + } + + return nil +} + +// NewWorkflowDefinition initiates new workflow process. +func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition { + return &workflowProcess{ + codec: pool.codec, + pool: pool, + } +} + +// NewWorkflowDefinition initiates new workflow process. +func (pool *workflowPoolImpl) SeqID() uint64 { + return atomic.AddUint64(&pool.seqID, 1) +} + +// Exec set of commands in thread safe move. +func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + if !pool.active { + return payload.Payload{}, nil + } + + return pool.worker.Exec(p) +} + +func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess { + return []rrWorker.BaseProcess{pool.worker} +} + +func (pool *workflowPoolImpl) WorkflowNames() []string { + names := make([]string, 0, len(pool.workflows)) + for name := range pool.workflows { + names = append(names, name) + } + + return names +} + +// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. +func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) + if err != nil { + return err + } + + pool.workflows = make(map[string]rrt.WorkflowInfo) + pool.tWorkers = make([]worker.Worker, 0) + + for _, info := range workerInfo { + w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + if err != nil { + return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx)) + } + + pool.tWorkers = append(pool.tWorkers, w) + for _, workflowInfo := range info.Workflows { + w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{ + Name: workflowInfo.Name, + DisableAlreadyRegisteredCheck: false, + }) + + pool.workflows[workflowInfo.Name] = workflowInfo + } + } + + return nil +} |