diff options
author | Valery Piashchynski <[email protected]> | 2021-01-27 14:34:31 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-27 14:34:31 +0300 |
commit | 6dd131497808f414ac1cb952d4b0b89b9e0689f8 (patch) | |
tree | f7af7d7d494d1f5ca272af1ad0b978fe44d685a9 /plugins | |
parent | e2266b80db47444ba5858c736833a8a81b1361ad (diff) | |
parent | 744c2b0c86b88f77e681f8660bf3a476e83711b8 (diff) |
Merge pull request #507 from spiral/refactor/temporal-plugins
refactoring(temporal): Move temporal plugin to the https://github.com/temporalio/roadrunner-temporal repository
Diffstat (limited to 'plugins')
22 files changed, 0 insertions, 2822 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go deleted file mode 100644 index d09722ce..00000000 --- a/plugins/temporal/activity/activity_pool.go +++ /dev/null @@ -1,197 +0,0 @@ -package activity - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/worker" -) - -// RR_MODE env variable -const RR_MODE = "RR_MODE" //nolint:golint,stylecheck -// RR_CODEC env variable -const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck - -// -const doNotCompleteOnReturn = "doNotCompleteOnReturn" - -type activityPool interface { - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.SyncWorker - ActivityNames() []string - GetActivityContext(taskToken []byte) (context.Context, error) -} - -type activityPoolImpl struct { - dc converter.DataConverter - codec rrt.Codec - seqID uint64 - activities []string - wp pool.Pool - tWorkers []worker.Worker - running sync.Map -} - -// newActivityPool -func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { - const op = errors.Op("new_activity_pool") - // env variables - env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} - wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) - if err != nil { - return nil, errors.E(op, err) - } - - return &activityPoolImpl{ - codec: codec, - wp: wp, - running: sync.Map{}, - }, nil -} - -// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("activity_pool_start") - pool.dc = temporal.GetDataConverter() - - err := pool.initWorkers(ctx, temporal) - if err != nil { - return errors.E(op, err) - } - - for i := 0; i < len(pool.tWorkers); i++ { - err := pool.tWorkers[i].Start() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) Destroy(ctx context.Context) error { - for i := 0; i < len(pool.tWorkers); i++ { - pool.tWorkers[i].Stop() - } - - pool.wp.Destroy(ctx) - return nil -} - -// Workers returns list of all allocated workers. -func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker { - return pool.wp.Workers() -} - -// ActivityNames returns list of all available activity names. -func (pool *activityPoolImpl) ActivityNames() []string { - return pool.activities -} - -// ActivityNames returns list of all available activity names. -func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { - const op = errors.Op("activity_pool_get_activity_context") - c, ok := pool.running.Load(string(taskToken)) - if !ok { - return nil, errors.E(op, errors.Str("heartbeat on non running activity")) - } - - return c.(context.Context), nil -} - -// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. -func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("activity_pool_create_temporal_worker") - - workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) - if err != nil { - return errors.E(op, err) - } - - pool.activities = make([]string, 0) - pool.tWorkers = make([]worker.Worker, 0) - - for i := 0; i < len(workerInfo); i++ { - w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) - if err != nil { - return errors.E(op, err, pool.Destroy(ctx)) - } - - pool.tWorkers = append(pool.tWorkers, w) - for j := 0; j < len(workerInfo[i].Activities); j++ { - w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ - Name: workerInfo[i].Activities[j].Name, - DisableAlreadyRegisteredCheck: false, - }) - - pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) - } - } - - return nil -} - -// executes activity with underlying worker. -func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { - const op = errors.Op("activity_pool_execute_activity") - - heartbeatDetails := &common.Payloads{} - if activity.HasHeartbeatDetails(ctx) { - err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails) - if err != nil { - return nil, errors.E(op, err) - } - } - - var info = activity.GetInfo(ctx) - var msg = rrt.Message{ - ID: atomic.AddUint64(&pool.seqID, 1), - Command: rrt.InvokeActivity{ - Name: info.ActivityType.Name, - Info: info, - HeartbeatDetails: len(heartbeatDetails.Payloads), - }, - Payloads: args, - } - - if len(heartbeatDetails.Payloads) != 0 { - msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) - } - - pool.running.Store(string(info.TaskToken), ctx) - defer pool.running.Delete(string(info.TaskToken)) - - result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg) - if err != nil { - return nil, errors.E(op, err) - } - - if len(result) != 1 { - return nil, errors.E(op, errors.Str("invalid activity worker response")) - } - - out := result[0] - if out.Failure != nil { - if out.Failure.Message == doNotCompleteOnReturn { - return nil, activity.ErrResultPending - } - - return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc) - } - - return out.Payloads, nil -} diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go deleted file mode 100644 index 5e562a8d..00000000 --- a/plugins/temporal/activity/plugin.go +++ /dev/null @@ -1,215 +0,0 @@ -package activity - -import ( - "context" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" -) - -const ( - // PluginName defines public service name. - PluginName = "activities" - - // RRMode sets as RR_MODE env variable to let worker know about the mode to run. - RRMode = "temporal/activity" -) - -// Plugin to manage activity execution. -type Plugin struct { - temporal client.Temporal - events events.Handler - server server.Server - log logger.Logger - mu sync.Mutex - reset chan struct{} - pool activityPool - closing int64 -} - -// Init configures activity service. -func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { - const op = errors.Op("activity_plugin_init") - if temporal.GetConfig().Activities == nil { - // no need to serve activities - return errors.E(op, errors.Disabled) - } - - p.temporal = temporal - p.server = server - p.events = events.NewEventsHandler() - p.log = log - p.reset = make(chan struct{}) - - return nil -} - -// Serve activities with underlying workers. -func (p *Plugin) Serve() chan error { - const op = errors.Op("activity_plugin_serve") - - errCh := make(chan error, 1) - pool, err := p.startPool() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - p.pool = pool - - go func() { - for { - select { - case <-p.reset: - if atomic.LoadInt64(&p.closing) == 1 { - return - } - - err := p.replacePool() - if err == nil { - continue - } - - bkoff := backoff.NewExponentialBackOff() - bkoff.InitialInterval = time.Second - - err = backoff.Retry(p.replacePool, bkoff) - if err != nil { - errCh <- errors.E(op, err) - } - } - } - }() - - return errCh -} - -// Stop stops the serving plugin. -func (p *Plugin) Stop() error { - atomic.StoreInt64(&p.closing, 1) - const op = errors.Op("activity_plugin_stop") - - pool := p.getPool() - if pool != nil { - p.pool = nil - err := pool.Destroy(context.Background()) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -// RPC returns associated rpc service. -func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, client: p.temporal.GetClient()} -} - -// Workers returns pool workers. -func (p *Plugin) Workers() []worker.SyncWorker { - return p.getPool().Workers() -} - -// ActivityNames returns list of all available activities. -func (p *Plugin) ActivityNames() []string { - return p.pool.ActivityNames() -} - -// Reset resets underlying workflow pool with new copy. -func (p *Plugin) Reset() error { - p.reset <- struct{}{} - - return nil -} - -// AddListener adds event listeners to the service. -func (p *Plugin) AddListener(listener events.Listener) { - p.events.AddListener(listener) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) poolListener(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventPoolError { - p.log.Error("Activity pool error", "error", ev.Payload.(error)) - p.reset <- struct{}{} - } - } - - p.events.Push(event) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) startPool() (activityPool, error) { - pool, err := newActivityPool( - p.temporal.GetCodec().WithLogger(p.log), - p.poolListener, - *p.temporal.GetConfig().Activities, - p.server, - ) - - if err != nil { - return nil, errors.E(errors.Op("newActivityPool"), err) - } - - err = pool.Start(context.Background(), p.temporal) - if err != nil { - return nil, errors.E(errors.Op("startActivityPool"), err) - } - - p.log.Debug("Started activity processing", "activities", pool.ActivityNames()) - - return pool, nil -} - -func (p *Plugin) replacePool() error { - pool, err := p.startPool() - if err != nil { - p.log.Error("Replace activity pool failed", "error", err) - return errors.E(errors.Op("newActivityPool"), err) - } - - p.log.Debug("Replace activity pool") - - var previous activityPool - - p.mu.Lock() - previous, p.pool = p.pool, pool - p.mu.Unlock() - - errD := previous.Destroy(context.Background()) - if errD != nil { - p.log.Error( - "Unable to destroy expired activity pool", - "error", - errors.E(errors.Op("destroyActivityPool"), errD), - ) - } - - return nil -} - -// getPool returns currently pool. -func (p *Plugin) getPool() activityPool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.pool -} diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go deleted file mode 100644 index 49efcd4f..00000000 --- a/plugins/temporal/activity/rpc.go +++ /dev/null @@ -1,66 +0,0 @@ -package activity - -import ( - v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - "go.temporal.io/sdk/client" - "google.golang.org/protobuf/proto" -) - -/* -- the method's type is exported. -- the method is exported. -- the method has two arguments, both exported (or builtin) types. -- the method's second argument is a pointer. -- the method has return type error. -*/ -type rpc struct { - srv *Plugin - client client.Client -} - -// RecordHeartbeatRequest sent by activity to record current state. -type RecordHeartbeatRequest struct { - TaskToken []byte `json:"taskToken"` - Details []byte `json:"details"` -} - -// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled. -type RecordHeartbeatResponse struct { - Canceled bool `json:"canceled"` -} - -// RecordActivityHeartbeat records heartbeat for an activity. -// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. -// details - is the progress you want to record along with heart beat for this activity. -// The errors it can return: -// - EntityNotExistsError -// - InternalServiceError -// - CanceledError -func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error { - details := &commonpb.Payloads{} - - if len(in.Details) != 0 { - if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil { - return err - } - } - - // find running activity - ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken) - if err != nil { - return err - } - - activity.RecordHeartbeat(ctx, details) - - select { - case <-ctx.Done(): - *out = RecordHeartbeatResponse{Canceled: true} - default: - *out = RecordHeartbeatResponse{Canceled: false} - } - - return nil -} diff --git a/plugins/temporal/client/doc/doc.go b/plugins/temporal/client/doc/doc.go deleted file mode 100644 index 10257070..00000000 --- a/plugins/temporal/client/doc/doc.go +++ /dev/null @@ -1 +0,0 @@ -package doc diff --git a/plugins/temporal/client/doc/temporal.drawio b/plugins/temporal/client/doc/temporal.drawio deleted file mode 100644 index f2350af8..00000000 --- a/plugins/temporal/client/doc/temporal.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2020-10-20T11:17:09.390Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="U-Sm3BmbD4KaqDkjKEOx" version="13.7.9" type="device"><diagram name="Page-1" id="6133507b-19e7-1e82-6fc7-422aa6c4b21f">7V1tc6M2EP41nsl1Jh5eDIaPsZPcdXptc83N5dpvspFtNRhRkGP7fn0lkDAgSEgCxrkoH2K0SEJon919VrwNzOl69zEC4ep37EF/YGjebmBeDgxD17UR/WGSPZdohpVKlhHyuOwguEU/oKjIpRvkwbhQkWDsExQWhXMcBHBOCjIQRXhbrLbAfvGoIVhCSXA7B74svUMeWaVSwzTtw45PEC1X4tC2xU95Bub3ywhvAn7AgWEukr909xqIzviZxivg4W1OZF4NzGmEMUm31rsp9NnsinlL213X7M0GHsGANGkwc0ezuW46c+A6GlycO3xYZC/mAnp0angRR2SFlzgA/tVBOklOF7IONVpakbVPN3W66YMZ9CfZjEyxjyO6K8ABaxYTEJELpq2S7Br5Pu8sKXN8OLQMA6/Ugkpy9WmJ19ZtWlzggFyDNfIZEL/ByAMB4GJRjc74RJ40Po8x3kRzPg9jx7V13XIXmg1dBzrnI5MjE0RLSGoq6eO0EpvFXNdcGR8hXkMS7WmFCPqAoIciAAHH8TKrx5vSSQD7XIUQo4DEuZ5vmIBWyGxyzAHHTdLR3CIuntmAbqRjEKXcyRxECdiaAU/nHuIB+Bs+CdpwqEtwJHBHiiiLYIx+gFlSgWGAzwWtbU0G1iWVAB8tAyqYU/1CisDJA4wIouZ+wXeskeclUK5B7MGKZfBUYkwCVI2dsYHAXUHn3KfxEyq4hQJgeKtzbWiNR1ZBV+cO91PPA5UEAqcIgXGpA7xYxBT4RVS8Gge2hAMFgqdAoA0dV3gagQG9FQhkdpl16x4BBO6biUIeAmsceF9XKOg1GDlGg2AkIlZ7waig8+dYuStb+XD4y89q6OJs2/D2zsjuxNKFSvaV7bvx9Zqk8FO1cxzCN2DgrnUqBi7ymyKd+2kNXCC5FQunarXaMWm95Ci0Ixi1GOZ7IHDtqV0b2q5b6YFfiQGjBAHjGH5dQsDAsH2m8LAAA/u/DVvnmKypT2P86YLu1cId/Z+4SS2VnxMcpvtGuX0MQeccEmwfR0XWJ91a8t/kyCgnAOuQoUQu/cr6WADqhIW42KTQI50VVD7KLJIkQvAVrkMcAT/XfFauTGVhWfaiKfPhguTmrG5S5OFWjWAVsSGIRTrRl17fbTdDZnU/QjL1EVX12QdaOpsn28NUNDCmLPTRgB19qDpB0R4HC7RM2qebNXW5yoNZzH6mEQQE3uHoHkZnBMT3XzaQgpudZ4SCZXpsHBKEg5hubZOKwz9TQTJWLkq7qBprcepLDrPs6xKbyFxiOncTTGst/ISvLBKW8ribm1K+gdhYtD/gtuSNCyQrW6RkhTleoznffopyRfg+W0DV+eFz3pj9XV8/Rokkx1rrQEduabGCF7eH9VtLrGes8ku35aD4EvZjmJZl2WPHAI4JbU+r8IBTH8RxANZQUm28RWsfJDOW00CiOV5JbxL9UkTMV8j3PoM93rDzo4R2fi9KkxWO0A/aLRDHyPNdwy7UuGUtuZaT0AxvhHayaA0/g5hkqPB9EMZolg04NesJJgSvBY74mTIKXRWUu0Jd7nC8UiOq/npQ2qMiKHVNXCfIwVK3q2BpOx3A0qgIzMwtLhBkV0iY8yX7UIaoTNfSWS7PqjzRJQ9VDdo4BHPqRD8ndS5HB8lffEKYaLtCBN6GIMmQthEIiw5vRclfkqJFmABSSyQndK6n2tBilJL6YIuFMVFOWCaN0SxE0PMDKNE7pCDfwvjglfOIqDX8pyHCIWHYzQAh6rWKB1Ph4dTwYBk94kG+BqPw0DMexk6PeJCvxUiq91GZuTQKwE/oPUvluaK/MhxcnusSGEwZDGaFkhPKcINjxPg4lUVp3YpVhmPrN7tZ4ikFd8EHxjX2TnteYe+M2foHZfa9wEI/YhyovWtB5S8qfykk1ZacVVenL3oL7kqCpaH4asuOqt7yTzKBkRExUog4OUQcM4WRhyvfWaA4a9sabpqUdBIEFGk9Wct3+owFI5kdKNL6DkmrbozLrLXxovuoC1wqjtK1p8pM/22w1pFaZz09RPTKWq2qW2YUa21Xw32yVoEuxVpPz/KPyVqluyUVLE4VFkddgi/jom6lU+Gif1yYR7wkK43Xlte3VJb7DrNcxy4muc1vLXONDmApUm6V0nTmqTLLfxtJ7rhqnValNO1quGnS2kVKI55IU5HofUciab2VhiKzGSz1URexyFELrl3Hosz230YscqouCitE9IuIXhdc3Sq+qthJuxruc8HVrQsCagWld8vvc8FVweJkYdHvPc+ufFFWpTPvMZ2x9JenMy28SUUGZtVTOIq8tumrMtt/G+mMrslLLwoSfUOi39uetaorhiqhaVfFfSY02YOCirqenu33mdHU3UyoYNE7LHp+ilNzJGiolOY9pjSa+9KbBfTy+35bQqZakO/86R2t7vXSJ5rU6OoWkhPERL9ZTcXb4FVW07qOe01rKt41ofjrqVj/MfOai5vb8fe7y4urL1eju+Cf6NO33+yKFylezAl6QGR/C6MH9vLKehr7FrhrKvod7AoVn8toH32tZxsE0ijxR4GBHC6yu4AK9NFswWdUAqPBCujhjbsaOrzYt9p9HN9nHNk/PAqRoqVIeHnUMl/DLisx05kvafDUZfp23mswJzgZusJN/7ipYKDHxU2D628iKN1grNzNacCmgtQeFzbNL8j8vPnKM4lJO5qveqVipeq74ibtXKh53z6iM3BULMkf1y+8/hObhzqfcWLmzMr/hYQmRWkmAjYEF3NiuEPke277b9YV1VNauhTpR1LYi0JAT/d7vpC2MixRPrRLSqLhy9KR/OePHqP8+S8g1Xx5pPPvH9Hi4Rut6RdTDp/CNa/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go deleted file mode 100644 index 047a1815..00000000 --- a/plugins/temporal/client/plugin.go +++ /dev/null @@ -1,169 +0,0 @@ -package client - -import ( - "fmt" - "os" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/worker" -) - -// PluginName defines public service name. -const PluginName = "temporal" - -// indicates that the case size was set -var stickyCacheSet = false - -// Plugin implement Temporal contract. -type Plugin struct { - workerID int32 - cfg *Config - dc converter.DataConverter - log logger.Logger - client client.Client -} - -// Temporal define common interface for RoadRunner plugins. -type Temporal interface { - GetClient() client.Client - GetDataConverter() converter.DataConverter - GetConfig() Config - GetCodec() rrt.Codec - CreateWorker(taskQueue string, options worker.Options) (worker.Worker, error) -} - -// Config of the temporal client and depended services. -type Config struct { - Address string - Namespace string - Activities *pool.Config - Codec string - DebugLevel int `mapstructure:"debug_level"` - CacheSize int `mapstructure:"cache_size"` -} - -// Init initiates temporal client plugin. -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("temporal_client_plugin_init") - p.log = log - p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - if p.cfg == nil { - return errors.E(op, errors.Disabled) - } - - return nil -} - -// GetConfig returns temporal configuration. -func (p *Plugin) GetConfig() Config { - if p.cfg != nil { - return *p.cfg - } - // empty - return Config{} -} - -// GetCodec returns communication codec. -func (p *Plugin) GetCodec() rrt.Codec { - if p.cfg.Codec == "json" { - return rrt.NewJSONCodec(rrt.DebugLevel(p.cfg.DebugLevel), p.log) - } - - // production ready protocol, no debug abilities - return rrt.NewProtoCodec() -} - -// GetDataConverter returns data active data converter. -func (p *Plugin) GetDataConverter() converter.DataConverter { - return p.dc -} - -// Serve starts temporal srv. -func (p *Plugin) Serve() chan error { - const op = errors.Op("temporal_client_plugin_serve") - errCh := make(chan error, 1) - var err error - - if stickyCacheSet == false && p.cfg.CacheSize != 0 { - worker.SetStickyWorkflowCacheSize(p.cfg.CacheSize) - stickyCacheSet = true - } - - p.client, err = client.NewClient(client.Options{ - Logger: p.log, - HostPort: p.cfg.Address, - Namespace: p.cfg.Namespace, - DataConverter: p.dc, - }) - - if err != nil { - errCh <- errors.E(op, err) - } - - p.log.Debug("connected to temporal server", "address", p.cfg.Address) - - return errCh -} - -// Stop stops temporal srv connection. -func (p *Plugin) Stop() error { - if p.client != nil { - p.client.Close() - } - - return nil -} - -// GetClient returns active srv connection. -func (p *Plugin) GetClient() client.Client { - return p.client -} - -// CreateWorker allocates new temporal worker on an active connection. -func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { - const op = errors.Op("temporal_client_plugin_create_worker") - if p.client == nil { - return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client")) - } - - if options.Identity == "" { - if tq == "" { - tq = client.DefaultNamespace - } - - // ensures unique worker IDs - options.Identity = fmt.Sprintf( - "%d@%s@%s@%v", - os.Getpid(), - getHostName(), - tq, - atomic.AddInt32(&p.workerID, 1), - ) - } - - return worker.New(p.client, tq, options), nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -func getHostName() string { - hostName, err := os.Hostname() - if err != nil { - hostName = "Unknown" - } - return hostName -} diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go deleted file mode 100644 index 406e70f4..00000000 --- a/plugins/temporal/protocol/converter.go +++ /dev/null @@ -1,76 +0,0 @@ -package protocol - -import ( - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -type ( - // DataConverter wraps Temporal data converter to enable direct access to the payloads. - DataConverter struct { - fallback converter.DataConverter - } -) - -// NewDataConverter creates new data converter. -func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { - return &DataConverter{fallback: fallback} -} - -// ToPayloads converts a list of values. -func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { - for _, v := range values { - if aggregated, ok := v.(*commonpb.Payloads); ok { - // bypassing - return aggregated, nil - } - } - - return r.fallback.ToPayloads(values...) -} - -// ToPayload converts single value to payload. -func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { - return r.fallback.ToPayload(value) -} - -// FromPayloads converts to a list of values of different types. -// Useful for deserializing arguments of function invocations. -func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { - if payloads == nil { - return nil - } - - if len(valuePtrs) == 1 { - // input proxying - if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { - *input = &commonpb.Payloads{} - (*input).Payloads = payloads.Payloads - return nil - } - } - - for i := 0; i < len(payloads.Payloads); i++ { - err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) - if err != nil { - return err - } - } - - return nil -} - -// FromPayload converts single value from payload. -func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { - return r.fallback.FromPayload(payload, valuePtr) -} - -// ToString converts payload object into human readable string. -func (r *DataConverter) ToString(input *commonpb.Payload) string { - return r.fallback.ToString(input) -} - -// ToStrings converts payloads object into human readable strings. -func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { - return r.fallback.ToStrings(input) -} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go deleted file mode 100644 index 6ce9fa0f..00000000 --- a/plugins/temporal/protocol/converter_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package protocol - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -func Test_Passthough(t *testing.T) { - codec := NewDataConverter(converter.GetDefaultDataConverter()) - - value, err := codec.ToPayloads("test") - assert.NoError(t, err) - - out := &common.Payloads{} - - assert.Len(t, out.Payloads, 0) - assert.NoError(t, codec.FromPayloads(value, &out)) - - assert.Len(t, out.Payloads, 1) -} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go deleted file mode 100644 index c554e28f..00000000 --- a/plugins/temporal/protocol/internal/protocol.pb.go +++ /dev/null @@ -1,167 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: protocol.proto - -package internal - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" - v11 "go.temporal.io/api/common/v1" - v1 "go.temporal.io/api/failure/v1" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Frame struct { - Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Frame) Reset() { *m = Frame{} } -func (m *Frame) String() string { return proto.CompactTextString(m) } -func (*Frame) ProtoMessage() {} -func (*Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{0} -} - -func (m *Frame) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Frame.Unmarshal(m, b) -} -func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Frame.Marshal(b, m, deterministic) -} -func (m *Frame) XXX_Merge(src proto.Message) { - xxx_messageInfo_Frame.Merge(m, src) -} -func (m *Frame) XXX_Size() int { - return xxx_messageInfo_Frame.Size(m) -} -func (m *Frame) XXX_DiscardUnknown() { - xxx_messageInfo_Frame.DiscardUnknown(m) -} - -var xxx_messageInfo_Frame proto.InternalMessageInfo - -func (m *Frame) GetMessages() []*Message { - if m != nil { - return m.Messages - } - return nil -} - -// Single communication message. -type Message struct { - Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - // command name (if any) - Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` - // command options in json format. - Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` - // error response. - Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` - // invocation or result payloads. - Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{1} -} - -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetId() uint64 { - if m != nil { - return m.Id - } - return 0 -} - -func (m *Message) GetCommand() string { - if m != nil { - return m.Command - } - return "" -} - -func (m *Message) GetOptions() []byte { - if m != nil { - return m.Options - } - return nil -} - -func (m *Message) GetFailure() *v1.Failure { - if m != nil { - return m.Failure - } - return nil -} - -func (m *Message) GetPayloads() *v11.Payloads { - if m != nil { - return m.Payloads - } - return nil -} - -func init() { - proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") - proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") -} - -func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } - -var fileDescriptor_2bc2336598a3f7e0 = []byte{ - // 257 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, - 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, - 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, - 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, - 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, - 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, - 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, - 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, - 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, - 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, - 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, - 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, - 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, - 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, - 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, - 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, - 0x00, -} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go deleted file mode 100644 index e7a77068..00000000 --- a/plugins/temporal/protocol/json_codec.go +++ /dev/null @@ -1,225 +0,0 @@ -package protocol - -import ( - "github.com/fatih/color" - j "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -// JSONCodec can be used for debugging and log capturing reasons. -type JSONCodec struct { - // level enables verbose logging or all incoming and outcoming messages. - level DebugLevel - - // logger renders messages when debug enabled. - logger logger.Logger -} - -// jsonFrame contains message command in binary form. -type jsonFrame struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command name. Optional. - Command string `json:"command,omitempty"` - - // Options to be unmarshalled to body (raw payload). - Options j.RawMessage `json:"options,omitempty"` - - // Failure associated with command id. - Failure []byte `json:"failure,omitempty"` - - // Payloads specific to the command or result. - Payloads []byte `json:"payloads,omitempty"` -} - -// NewJSONCodec creates new Json communication codec. -func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { - return &JSONCodec{ - level: level, - logger: logger, - } -} - -// WithLogger creates new codes instance with attached logger. -func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { - return &JSONCodec{ - level: c.level, - logger: logger, - } -} - -// GetName returns codec name. -func (c *JSONCodec) GetName() string { - return "json" -} - -// Execute exchanges commands with worker. -func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - const op = errors.Op("json_codec_execute") - if len(msg) == 0 { - return nil, nil - } - - var response = make([]jsonFrame, 0, 5) - var result = make([]Message, 0, 5) - var err error - - frames := make([]jsonFrame, 0, len(msg)) - for _, m := range msg { - frame, err := c.packFrame(m) - if err != nil { - return nil, errors.E(op, err) - } - - frames = append(frames, frame) - } - - p := payload.Payload{} - - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = json.Marshal(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - p.Body, err = json.Marshal(frames) - if err != nil { - return nil, errors.E(op, err) - } - - if c.level >= DebugNormal { - logMessage := string(p.Body) + " " + string(p.Context) - if c.level >= DebugHumanized { - logMessage = color.GreenString(logMessage) - } - - c.logger.Debug(logMessage) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(op, err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - if c.level >= DebugNormal { - logMessage := string(out.Body) - if c.level >= DebugHumanized { - logMessage = color.HiYellowString(logMessage) - } - - c.logger.Debug(logMessage, "receive", true) - } - - err = json.Unmarshal(out.Body, &response) - if err != nil { - return nil, errors.E(op, err) - } - - for _, f := range response { - msg, err := c.parseFrame(f) - if err != nil { - return nil, errors.E(op, err) - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { - var ( - err error - frame jsonFrame - ) - - frame.ID = msg.ID - - if msg.Payloads != nil { - frame.Payloads, err = msg.Payloads.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Failure != nil { - frame.Failure, err = msg.Failure.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Command == nil { - return frame, nil - } - - frame.Command, err = commandName(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - frame.Options, err = json.Marshal(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - return frame, nil -} - -func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { - var ( - err error - msg Message - ) - - msg.ID = frame.ID - - if frame.Payloads != nil { - msg.Payloads = &common.Payloads{} - - err = msg.Payloads.Unmarshal(frame.Payloads) - if err != nil { - return Message{}, err - } - } - - if frame.Failure != nil { - msg.Failure = &failure.Failure{} - - err = msg.Failure.Unmarshal(frame.Failure) - if err != nil { - return Message{}, err - } - } - - if frame.Command != "" { - cmd, err := initCommand(frame.Command) - if err != nil { - return Message{}, err - } - - err = json.Unmarshal(frame.Options, &cmd) - if err != nil { - return Message{}, err - } - - msg.Command = cmd - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go deleted file mode 100644 index d5e0f49d..00000000 --- a/plugins/temporal/protocol/message.go +++ /dev/null @@ -1,334 +0,0 @@ -package protocol - -import ( - "time" - - "github.com/spiral/errors" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/workflow" -) - -const ( - getWorkerInfoCommand = "GetWorkerInfo" - - invokeActivityCommand = "InvokeActivity" - startWorkflowCommand = "StartWorkflow" - invokeSignalCommand = "InvokeSignal" - invokeQueryCommand = "InvokeQuery" - destroyWorkflowCommand = "DestroyWorkflow" - cancelWorkflowCommand = "CancelWorkflow" - getStackTraceCommand = "StackTrace" - - executeActivityCommand = "ExecuteActivity" - executeChildWorkflowCommand = "ExecuteChildWorkflow" - getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" - - newTimerCommand = "NewTimer" - sideEffectCommand = "SideEffect" - getVersionCommand = "GetVersion" - completeWorkflowCommand = "CompleteWorkflow" - continueAsNewCommand = "ContinueAsNew" - - signalExternalWorkflowCommand = "SignalExternalWorkflow" - cancelExternalWorkflowCommand = "CancelExternalWorkflow" - - cancelCommand = "Cancel" - panicCommand = "Panic" -) - -// GetWorkerInfo reads worker information. -type GetWorkerInfo struct{} - -// InvokeActivity invokes activity. -type InvokeActivity struct { - // Name defines activity name. - Name string `json:"name"` - - // Info contains execution context. - Info activity.Info `json:"info"` - - // HeartbeatDetails indicates that the payload also contains last heartbeat details. - HeartbeatDetails int `json:"heartbeatDetails,omitempty"` -} - -// StartWorkflow sends worker command to start workflow. -type StartWorkflow struct { - // Info to define workflow context. - Info *workflow.Info `json:"info"` - - // LastCompletion contains offset of last completion results. - LastCompletion int `json:"lastCompletion,omitempty"` -} - -// InvokeSignal invokes signal with a set of arguments. -type InvokeSignal struct { - // RunID workflow run id. - RunID string `json:"runId"` - - // Name of the signal. - Name string `json:"name"` -} - -// InvokeQuery invokes query with a set of arguments. -type InvokeQuery struct { - // RunID workflow run id. - RunID string `json:"runId"` - // Name of the query. - Name string `json:"name"` -} - -// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). -type CancelWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// DestroyWorkflow asks worker to offload workflow from memory. -type DestroyWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// GetStackTrace asks worker to offload workflow from memory. -type GetStackTrace struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// ExecuteActivity command by workflow worker. -type ExecuteActivity struct { - // Name defines activity name. - Name string `json:"name"` - // Options to run activity. - Options bindings.ExecuteActivityOptions `json:"options,omitempty"` -} - -// ExecuteChildWorkflow executes child workflow. -type ExecuteChildWorkflow struct { - // Name defines workflow name. - Name string `json:"name"` - // Options to run activity. - Options bindings.WorkflowOptions `json:"options,omitempty"` -} - -// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. -type GetChildWorkflowExecution struct { - // ID of child workflow command. - ID uint64 `json:"id"` -} - -// NewTimer starts new timer. -type NewTimer struct { - // Milliseconds defines timer duration. - Milliseconds int `json:"ms"` -} - -// SideEffect to be recorded into the history. -type SideEffect struct{} - -// GetVersion requests version marker. -type GetVersion struct { - ChangeID string `json:"changeID"` - MinSupported int `json:"minSupported"` - MaxSupported int `json:"maxSupported"` -} - -// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. -type CompleteWorkflow struct{} - -// ContinueAsNew restarts workflow with new running instance. -type ContinueAsNew struct { - // Result defines workflow execution result. - Name string `json:"name"` - - // Options for continued as new workflow. - Options struct { - TaskQueueName string - WorkflowExecutionTimeout time.Duration - WorkflowRunTimeout time.Duration - WorkflowTaskTimeout time.Duration - } `json:"options"` -} - -// SignalExternalWorkflow sends signal to external workflow. -type SignalExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` - Signal string `json:"signal"` - ChildWorkflowOnly bool `json:"childWorkflowOnly"` -} - -// CancelExternalWorkflow canceller external workflow. -type CancelExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` -} - -// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). -type Cancel struct { - // CommandIDs to be cancelled. - CommandIDs []uint64 `json:"ids"` -} - -// Panic triggers panic in workflow process. -type Panic struct { - // Message to include into the error. - Message string `json:"message"` -} - -// ActivityParams maps activity command to activity params. -func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { - params := bindings.ExecuteActivityParams{ - ExecuteActivityOptions: cmd.Options, - ActivityType: bindings.ActivityType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// WorkflowParams maps workflow command to workflow params. -func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { - params := bindings.ExecuteWorkflowParams{ - WorkflowOptions: cmd.Options, - WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// ToDuration converts timer command to time.Duration. -func (cmd NewTimer) ToDuration() time.Duration { - return time.Millisecond * time.Duration(cmd.Milliseconds) -} - -// returns command name (only for the commands sent to the worker) -func commandName(cmd interface{}) (string, error) { - const op = errors.Op("command_name") - switch cmd.(type) { - case GetWorkerInfo, *GetWorkerInfo: - return getWorkerInfoCommand, nil - case StartWorkflow, *StartWorkflow: - return startWorkflowCommand, nil - case InvokeSignal, *InvokeSignal: - return invokeSignalCommand, nil - case InvokeQuery, *InvokeQuery: - return invokeQueryCommand, nil - case DestroyWorkflow, *DestroyWorkflow: - return destroyWorkflowCommand, nil - case CancelWorkflow, *CancelWorkflow: - return cancelWorkflowCommand, nil - case GetStackTrace, *GetStackTrace: - return getStackTraceCommand, nil - case InvokeActivity, *InvokeActivity: - return invokeActivityCommand, nil - case ExecuteActivity, *ExecuteActivity: - return executeActivityCommand, nil - case ExecuteChildWorkflow, *ExecuteChildWorkflow: - return executeChildWorkflowCommand, nil - case GetChildWorkflowExecution, *GetChildWorkflowExecution: - return getChildWorkflowExecutionCommand, nil - case NewTimer, *NewTimer: - return newTimerCommand, nil - case GetVersion, *GetVersion: - return getVersionCommand, nil - case SideEffect, *SideEffect: - return sideEffectCommand, nil - case CompleteWorkflow, *CompleteWorkflow: - return completeWorkflowCommand, nil - case ContinueAsNew, *ContinueAsNew: - return continueAsNewCommand, nil - case SignalExternalWorkflow, *SignalExternalWorkflow: - return signalExternalWorkflowCommand, nil - case CancelExternalWorkflow, *CancelExternalWorkflow: - return cancelExternalWorkflowCommand, nil - case Cancel, *Cancel: - return cancelCommand, nil - case Panic, *Panic: - return panicCommand, nil - default: - return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) - } -} - -// reads command from binary payload -func initCommand(name string) (interface{}, error) { - const op = errors.Op("init_command") - switch name { - case getWorkerInfoCommand: - return &GetWorkerInfo{}, nil - - case startWorkflowCommand: - return &StartWorkflow{}, nil - - case invokeSignalCommand: - return &InvokeSignal{}, nil - - case invokeQueryCommand: - return &InvokeQuery{}, nil - - case destroyWorkflowCommand: - return &DestroyWorkflow{}, nil - - case cancelWorkflowCommand: - return &CancelWorkflow{}, nil - - case getStackTraceCommand: - return &GetStackTrace{}, nil - - case invokeActivityCommand: - return &InvokeActivity{}, nil - - case executeActivityCommand: - return &ExecuteActivity{}, nil - - case executeChildWorkflowCommand: - return &ExecuteChildWorkflow{}, nil - - case getChildWorkflowExecutionCommand: - return &GetChildWorkflowExecution{}, nil - - case newTimerCommand: - return &NewTimer{}, nil - - case getVersionCommand: - return &GetVersion{}, nil - - case sideEffectCommand: - return &SideEffect{}, nil - - case completeWorkflowCommand: - return &CompleteWorkflow{}, nil - - case continueAsNewCommand: - return &ContinueAsNew{}, nil - - case signalExternalWorkflowCommand: - return &SignalExternalWorkflow{}, nil - - case cancelExternalWorkflowCommand: - return &CancelExternalWorkflow{}, nil - - case cancelCommand: - return &Cancel{}, nil - - case panicCommand: - return &Panic{}, nil - - default: - return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) - } -} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go deleted file mode 100644 index 607fe0fe..00000000 --- a/plugins/temporal/protocol/proto_codec.go +++ /dev/null @@ -1,145 +0,0 @@ -package protocol - -import ( - v1 "github.com/golang/protobuf/proto" //nolint:staticcheck - jsoniter "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" - "google.golang.org/protobuf/proto" -) - -type ( - // ProtoCodec uses protobuf to exchange messages with underlying workers. - ProtoCodec struct { - } -) - -// NewProtoCodec creates new Proto communication codec. -func NewProtoCodec() Codec { - return &ProtoCodec{} -} - -// WithLogger creates new codes instance with attached logger. -func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { - return &ProtoCodec{} -} - -// GetName returns codec name. -func (c *ProtoCodec) GetName() string { - return "protobuf" -} - -// Execute exchanges commands with worker. -func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - if len(msg) == 0 { - return nil, nil - } - - var request = &internal.Frame{} - var response = &internal.Frame{} - var result = make([]Message, 0, 5) - var err error - - for _, m := range msg { - frame, err := c.packMessage(m) - if err != nil { - return nil, err - } - - request.Messages = append(request.Messages, frame) - } - - p := payload.Payload{} - - // context is always in json format - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = jsoniter.Marshal(ctx) - if err != nil { - return nil, errors.E(errors.Op("encodeContext"), err) - } - - p.Body, err = proto.Marshal(v1.MessageV2(request)) - if err != nil { - return nil, errors.E(errors.Op("encodePayload"), err) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(errors.Op("execute"), err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - err = proto.Unmarshal(out.Body, v1.MessageV2(response)) - if err != nil { - return nil, errors.E(errors.Op("parseResponse"), err) - } - - for _, f := range response.Messages { - msg, err := c.parseMessage(f) - if err != nil { - return nil, err - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { - var err error - - frame := &internal.Message{ - Id: msg.ID, - Payloads: msg.Payloads, - Failure: msg.Failure, - } - - if msg.Command != nil { - frame.Command, err = commandName(msg.Command) - if err != nil { - return nil, err - } - - frame.Options, err = jsoniter.Marshal(msg.Command) - if err != nil { - return nil, err - } - } - - return frame, nil -} - -func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { - const op = errors.Op("proto_codec_parse_message") - var err error - - msg := Message{ - ID: frame.Id, - Payloads: frame.Payloads, - Failure: frame.Failure, - } - - if frame.Command != "" { - msg.Command, err = initCommand(frame.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - - err = jsoniter.Unmarshal(frame.Options, &msg.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go deleted file mode 100644 index 53076fdf..00000000 --- a/plugins/temporal/protocol/protocol.go +++ /dev/null @@ -1,77 +0,0 @@ -package protocol - -import ( - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -const ( - // DebugNone disables all debug messages. - DebugNone = iota - - // DebugNormal renders all messages into console. - DebugNormal - - // DebugHumanized enables color highlights for messages. - DebugHumanized -) - -// Context provides worker information about currently. Context can be empty for server level commands. -type Context struct { - // TaskQueue associates message batch with the specific task queue in underlying worker. - TaskQueue string `json:"taskQueue,omitempty"` - - // TickTime associated current or historical time with message batch. - TickTime string `json:"tickTime,omitempty"` - - // Replay indicates that current message batch is historical. - Replay bool `json:"replay,omitempty"` -} - -// Message used to exchange the send commands and receive responses from underlying workers. -type Message struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command of the message in unmarshalled form. Pointer. - Command interface{} `json:"command,omitempty"` - - // Failure associated with command id. - Failure *failure.Failure `json:"failure,omitempty"` - - // Payloads contains message specific payloads in binary format. - Payloads *commonpb.Payloads `json:"payloads,omitempty"` -} - -// Codec manages payload encoding and decoding while communication with underlying worker. -type Codec interface { - // WithLogger creates new codes instance with attached logger. - WithLogger(logger.Logger) Codec - - // GetName returns codec name. - GetName() string - - // Execute sends message to worker and waits for the response. - Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) -} - -// Endpoint provides the ability to send and receive messages. -type Endpoint interface { - // ExecWithContext allow to set ExecTTL - Exec(p payload.Payload) (payload.Payload, error) -} - -// DebugLevel configures debug level. -type DebugLevel int - -// IsEmpty only check if task queue set. -func (ctx Context) IsEmpty() bool { - return ctx.TaskQueue == "" -} - -// IsCommand returns true if message carries request. -func (msg Message) IsCommand() bool { - return msg.Command != nil -} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go deleted file mode 100644 index 58a0ae66..00000000 --- a/plugins/temporal/protocol/worker_info.go +++ /dev/null @@ -1,72 +0,0 @@ -package protocol - -import ( - "github.com/spiral/errors" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/worker" -) - -// WorkerInfo outlines information about every available worker and it's TaskQueues. - -// WorkerInfo lists available task queues, workflows and activities. -type WorkerInfo struct { - // TaskQueue assigned to the worker. - TaskQueue string `json:"taskQueue"` - - // Options describe worker options. - Options worker.Options `json:"options,omitempty"` - - // Workflows provided by the worker. - Workflows []WorkflowInfo - - // Activities provided by the worker. - Activities []ActivityInfo -} - -// WorkflowInfo describes single worker workflow. -type WorkflowInfo struct { - // Name of the workflow. - Name string `json:"name"` - - // Queries pre-defined for the workflow type. - Queries []string `json:"queries"` - - // Signals pre-defined for the workflow type. - Signals []string `json:"signals"` -} - -// ActivityInfo describes single worker activity. -type ActivityInfo struct { - // Name describes public activity name. - Name string `json:"name"` -} - -// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). -func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { - const op = errors.Op("fetch_worker_info") - - result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) - if err != nil { - return nil, errors.E(op, err) - } - - if len(result) != 1 { - return nil, errors.E(op, errors.Str("unable to read worker info")) - } - - if result[0].ID != 0 { - return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) - } - - var info []WorkerInfo - for i := range result[0].Payloads.Payloads { - wi := WorkerInfo{} - if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { - return nil, errors.E(op, err) - } - - info = append(info, wi) - } - - return info, nil -} diff --git a/plugins/temporal/workflow/canceller.go b/plugins/temporal/workflow/canceller.go deleted file mode 100644 index 962c527f..00000000 --- a/plugins/temporal/workflow/canceller.go +++ /dev/null @@ -1,41 +0,0 @@ -package workflow - -import ( - "sync" -) - -type cancellable func() error - -type canceller struct { - ids sync.Map -} - -func (c *canceller) register(id uint64, cancel cancellable) { - c.ids.Store(id, cancel) -} - -func (c *canceller) discard(id uint64) { - c.ids.Delete(id) -} - -func (c *canceller) cancel(ids ...uint64) error { - var err error - for _, id := range ids { - cancel, ok := c.ids.Load(id) - if ok == false { - continue - } - - // TODO return when minimum supported version will be go 1.15 - // go1.14 don't have LoadAndDelete method - // It was introduced only in go1.15 - c.ids.Delete(id) - - err = cancel.(cancellable)() - if err != nil { - return err - } - } - - return nil -} diff --git a/plugins/temporal/workflow/canceller_test.go b/plugins/temporal/workflow/canceller_test.go deleted file mode 100644 index d6e846f8..00000000 --- a/plugins/temporal/workflow/canceller_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package workflow - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_CancellerNoListeners(t *testing.T) { - c := &canceller{} - - assert.NoError(t, c.cancel(1)) -} - -func Test_CancellerListenerError(t *testing.T) { - c := &canceller{} - c.register(1, func() error { - return errors.New("failed") - }) - - assert.Error(t, c.cancel(1)) -} - -func Test_CancellerListenerDiscarded(t *testing.T) { - c := &canceller{} - c.register(1, func() error { - return errors.New("failed") - }) - - c.discard(1) - assert.NoError(t, c.cancel(1)) -} diff --git a/plugins/temporal/workflow/id_registry.go b/plugins/temporal/workflow/id_registry.go deleted file mode 100644 index ac75cbda..00000000 --- a/plugins/temporal/workflow/id_registry.go +++ /dev/null @@ -1,51 +0,0 @@ -package workflow - -import ( - "sync" - - bindings "go.temporal.io/sdk/internalbindings" -) - -// used to gain access to child workflow ids after they become available via callback result. -type idRegistry struct { - mu sync.Mutex - ids map[uint64]entry - listeners map[uint64]listener -} - -type listener func(w bindings.WorkflowExecution, err error) - -type entry struct { - w bindings.WorkflowExecution - err error -} - -func newIDRegistry() *idRegistry { - return &idRegistry{ - ids: map[uint64]entry{}, - listeners: map[uint64]listener{}, - } -} - -func (c *idRegistry) listen(id uint64, cl listener) { - c.mu.Lock() - defer c.mu.Unlock() - - c.listeners[id] = cl - - if e, ok := c.ids[id]; ok { - cl(e.w, e.err) - } -} - -func (c *idRegistry) push(id uint64, w bindings.WorkflowExecution, err error) { - c.mu.Lock() - defer c.mu.Unlock() - - e := entry{w: w, err: err} - c.ids[id] = e - - if l, ok := c.listeners[id]; ok { - l(e.w, e.err) - } -} diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go deleted file mode 100644 index 8f4409d1..00000000 --- a/plugins/temporal/workflow/message_queue.go +++ /dev/null @@ -1,47 +0,0 @@ -package workflow - -import ( - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -type messageQueue struct { - seqID func() uint64 - queue []rrt.Message -} - -func newMessageQueue(sedID func() uint64) *messageQueue { - return &messageQueue{ - seqID: sedID, - queue: make([]rrt.Message, 0, 5), - } -} - -func (mq *messageQueue) flush() { - mq.queue = mq.queue[0:0] -} - -func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { - msg := rrt.Message{ - ID: mq.seqID(), - Command: cmd, - Payloads: payloads, - } - - return msg.ID, msg -} - -func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { - id, msg := mq.allocateMessage(cmd, payloads) - mq.queue = append(mq.queue, msg) - return id -} - -func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { - mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads}) -} - -func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) { - mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure}) -} diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go deleted file mode 100644 index 1fcd409f..00000000 --- a/plugins/temporal/workflow/message_queue_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package workflow - -import ( - "sync/atomic" - "testing" - - "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - "github.com/stretchr/testify/assert" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -func Test_MessageQueueFlushError(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - mq.pushError(1, &failure.Failure{}) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) - assert.Equal(t, uint64(0), index) -} - -func Test_MessageQueueFlushResponse(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - mq.pushResponse(1, &common.Payloads{}) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) - assert.Equal(t, uint64(0), index) -} - -func Test_MessageQueueCommandID(t *testing.T) { - var index uint64 - mq := newMessageQueue(func() uint64 { - return atomic.AddUint64(&index, 1) - }) - - n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) - assert.Equal(t, n, index) - assert.Len(t, mq.queue, 1) - - mq.flush() - assert.Len(t, mq.queue, 0) -} diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go deleted file mode 100644 index 572d9a3b..00000000 --- a/plugins/temporal/workflow/plugin.go +++ /dev/null @@ -1,203 +0,0 @@ -package workflow - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" -) - -const ( - // PluginName defines public service name. - PluginName = "workflows" - - // RRMode sets as RR_MODE env variable to let worker know about the mode to run. - RRMode = "temporal/workflow" -) - -// Plugin manages workflows and workers. -type Plugin struct { - temporal client.Temporal - events events.Handler - server server.Server - log logger.Logger - mu sync.Mutex - reset chan struct{} - pool workflowPool - closing int64 -} - -// Init workflow plugin. -func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { - p.temporal = temporal - p.server = server - p.events = events.NewEventsHandler() - p.log = log - p.reset = make(chan struct{}, 1) - - return nil -} - -// Serve starts workflow service. -func (p *Plugin) Serve() chan error { - const op = errors.Op("workflow_plugin_serve") - errCh := make(chan error, 1) - - pool, err := p.startPool() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - p.pool = pool - - go func() { - for { - select { - case <-p.reset: - if atomic.LoadInt64(&p.closing) == 1 { - return - } - - err := p.replacePool() - if err == nil { - continue - } - - bkoff := backoff.NewExponentialBackOff() - bkoff.InitialInterval = time.Second - - err = backoff.Retry(p.replacePool, bkoff) - if err != nil { - errCh <- errors.E(op, err) - } - } - } - }() - - return errCh -} - -// Stop workflow service. -func (p *Plugin) Stop() error { - const op = errors.Op("workflow_plugin_stop") - atomic.StoreInt64(&p.closing, 1) - - pool := p.getPool() - if pool != nil { - p.pool = nil - err := pool.Destroy(context.Background()) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return nil -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -// Workers returns list of available workflow workers. -func (p *Plugin) Workers() []worker.BaseProcess { - p.mu.Lock() - defer p.mu.Unlock() - return p.pool.Workers() -} - -// WorkflowNames returns list of all available workflows. -func (p *Plugin) WorkflowNames() []string { - return p.pool.WorkflowNames() -} - -// Reset resets underlying workflow pool with new copy. -func (p *Plugin) Reset() error { - p.reset <- struct{}{} - - return nil -} - -// AddListener adds event listeners to the service. -func (p *Plugin) poolListener(event interface{}) { - if ev, ok := event.(PoolEvent); ok { - if ev.Event == eventWorkerExit { - if ev.Caused != nil { - p.log.Error("Workflow pool error", "error", ev.Caused) - } - p.reset <- struct{}{} - } - } - - p.events.Push(event) -} - -// AddListener adds event listeners to the service. -func (p *Plugin) startPool() (workflowPool, error) { - const op = errors.Op("workflow_plugin_start_pool") - pool, err := newWorkflowPool( - p.temporal.GetCodec().WithLogger(p.log), - p.poolListener, - p.server, - ) - if err != nil { - return nil, errors.E(op, err) - } - - err = pool.Start(context.Background(), p.temporal) - if err != nil { - return nil, errors.E(op, err) - } - - p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) - - return pool, nil -} - -func (p *Plugin) replacePool() error { - p.mu.Lock() - const op = errors.Op("workflow_plugin_replace_pool") - defer p.mu.Unlock() - - if p.pool != nil { - err := p.pool.Destroy(context.Background()) - p.pool = nil - if err != nil { - p.log.Error( - "Unable to destroy expired workflow pool", - "error", - errors.E(op, err), - ) - return errors.E(op, err) - } - } - - pool, err := p.startPool() - if err != nil { - p.log.Error("Replace workflow pool failed", "error", err) - return errors.E(op, err) - } - - p.pool = pool - p.log.Debug("workflow pool successfully replaced") - - return nil -} - -// getPool returns currently pool. -func (p *Plugin) getPool() workflowPool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.pool -} diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go deleted file mode 100644 index 45e6885c..00000000 --- a/plugins/temporal/workflow/process.go +++ /dev/null @@ -1,436 +0,0 @@ -package workflow - -import ( - "strconv" - "sync/atomic" - "time" - - "github.com/spiral/errors" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - commonpb "go.temporal.io/api/common/v1" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/workflow" -) - -// wraps single workflow process -type workflowProcess struct { - codec rrt.Codec - pool workflowPool - env bindings.WorkflowEnvironment - header *commonpb.Header - mq *messageQueue - ids *idRegistry - seqID uint64 - runID string - pipeline []rrt.Message - callbacks []func() error - canceller *canceller - inLoop bool -} - -// Execute workflow, bootstraps process. -func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { - wf.env = env - wf.header = header - wf.seqID = 0 - wf.runID = env.WorkflowInfo().WorkflowExecution.RunID - wf.canceller = &canceller{} - - // sequenceID shared for all worker workflows - wf.mq = newMessageQueue(wf.pool.SeqID) - wf.ids = newIDRegistry() - - env.RegisterCancelHandler(wf.handleCancel) - env.RegisterSignalHandler(wf.handleSignal) - env.RegisterQueryHandler(wf.handleQuery) - - var ( - lastCompletion = bindings.GetLastCompletionResult(env) - lastCompletionOffset = 0 - ) - - if lastCompletion != nil && len(lastCompletion.Payloads) != 0 { - if input == nil { - input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}} - } - - input.Payloads = append(input.Payloads, lastCompletion.Payloads...) - lastCompletionOffset = len(lastCompletion.Payloads) - } - - _ = wf.mq.pushCommand( - rrt.StartWorkflow{ - Info: env.WorkflowInfo(), - LastCompletion: lastCompletionOffset, - }, - input, - ) -} - -// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. -func (wf *workflowProcess) OnWorkflowTaskStarted() { - wf.inLoop = true - defer func() { wf.inLoop = false }() - - var err error - for _, callback := range wf.callbacks { - err = callback() - if err != nil { - panic(err) - } - } - wf.callbacks = nil - - if err := wf.flushQueue(); err != nil { - panic(err) - } - - for len(wf.pipeline) > 0 { - msg := wf.pipeline[0] - wf.pipeline = wf.pipeline[1:] - - if msg.IsCommand() { - err = wf.handleMessage(msg) - } - - if err != nil { - panic(err) - } - } -} - -// StackTrace renders workflow stack trace. -func (wf *workflowProcess) StackTrace() string { - result, err := wf.runCommand( - rrt.GetStackTrace{ - RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, - }, - nil, - ) - - if err != nil { - return err.Error() - } - - var stacktrace string - err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace) - if err != nil { - return err.Error() - } - - return stacktrace -} - -// Close the workflow. -func (wf *workflowProcess) Close() { - // TODO: properly handle errors - // panic(err) - - _ = wf.mq.pushCommand( - rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, - nil, - ) - - _, _ = wf.discardQueue() -} - -// execution context. -func (wf *workflowProcess) getContext() rrt.Context { - return rrt.Context{ - TaskQueue: wf.env.WorkflowInfo().TaskQueueName, - TickTime: wf.env.Now().Format(time.RFC3339), - Replay: wf.env.IsReplaying(), - } -} - -// schedule cancel command -func (wf *workflowProcess) handleCancel() { - _ = wf.mq.pushCommand( - rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, - nil, - ) -} - -// schedule the signal processing -func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { - _ = wf.mq.pushCommand( - rrt.InvokeSignal{ - RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, - Name: name, - }, - input, - ) -} - -// Handle query in blocking mode. -func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { - result, err := wf.runCommand( - rrt.InvokeQuery{ - RunID: wf.runID, - Name: queryType, - }, - queryArgs, - ) - - if err != nil { - return nil, err - } - - if result.Failure != nil { - return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter()) - } - - return result.Payloads, nil -} - -// process incoming command -func (wf *workflowProcess) handleMessage(msg rrt.Message) error { - const op = errors.Op("handleMessage") - var err error - - var ( - id = msg.ID - cmd = msg.Command - payloads = msg.Payloads - ) - - switch cmd := cmd.(type) { - case *rrt.ExecuteActivity: - params := cmd.ActivityParams(wf.env, payloads) - activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) - - wf.canceller.register(id, func() error { - wf.env.RequestCancelActivity(activityID) - return nil - }) - - case *rrt.ExecuteChildWorkflow: - params := cmd.WorkflowParams(wf.env, payloads) - - // always use deterministic id - if params.WorkflowID == "" { - nextID := atomic.AddUint64(&wf.seqID, 1) - params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) - } - - wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { - wf.ids.push(id, r, e) - }) - - wf.canceller.register(id, func() error { - wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) - return nil - }) - - case *rrt.GetChildWorkflowExecution: - wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) { - cl := wf.createCallback(id) - - // TODO rewrite - if err != nil { - panic(err) - } - - p, err := wf.env.GetDataConverter().ToPayloads(w) - if err != nil { - panic(err) - } - - cl(p, err) - }) - - case *rrt.NewTimer: - timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id)) - wf.canceller.register(id, func() error { - if timerID != nil { - wf.env.RequestCancelTimer(*timerID) - } - return nil - }) - - case *rrt.GetVersion: - version := wf.env.GetVersion( - cmd.ChangeID, - workflow.Version(cmd.MinSupported), - workflow.Version(cmd.MaxSupported), - ) - - result, err := wf.env.GetDataConverter().ToPayloads(version) - if err != nil { - return errors.E(op, err) - } - - wf.mq.pushResponse(id, result) - err = wf.flushQueue() - if err != nil { - panic(err) - } - - case *rrt.SideEffect: - wf.env.SideEffect( - func() (*commonpb.Payloads, error) { - return payloads, nil - }, - wf.createContinuableCallback(id), - ) - - case *rrt.CompleteWorkflow: - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - if msg.Failure == nil { - wf.env.Complete(payloads, nil) - } else { - wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) - } - - case *rrt.ContinueAsNew: - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - wf.env.Complete(nil, &workflow.ContinueAsNewError{ - WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, - Input: payloads, - Header: wf.header, - TaskQueueName: cmd.Options.TaskQueueName, - WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout, - WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout, - WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout, - }) - - case *rrt.SignalExternalWorkflow: - wf.env.SignalExternalWorkflow( - cmd.Namespace, - cmd.WorkflowID, - cmd.RunID, - cmd.Signal, - payloads, - nil, - cmd.ChildWorkflowOnly, - wf.createCallback(id), - ) - - case *rrt.CancelExternalWorkflow: - wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id)) - - case *rrt.Cancel: - err = wf.canceller.cancel(cmd.CommandIDs...) - if err != nil { - return errors.E(op, err) - } - - result, _ := wf.env.GetDataConverter().ToPayloads("completed") - wf.mq.pushResponse(id, result) - - err = wf.flushQueue() - if err != nil { - panic(err) - } - - case *rrt.Panic: - panic(errors.E(cmd.Message)) - - default: - panic("undefined command") - } - - return nil -} - -func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler { - callback := func(result *commonpb.Payloads, err error) error { - wf.canceller.discard(id) - - if err != nil { - wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) - return nil - } - - // fetch original payload - wf.mq.pushResponse(id, result) - return nil - } - - return func(result *commonpb.Payloads, err error) { - // timer cancel callback can happen inside the loop - if wf.inLoop { - err := callback(result, err) - if err != nil { - panic(err) - } - - return - } - - wf.callbacks = append(wf.callbacks, func() error { - return callback(result, err) - }) - } -} - -// callback to be called inside the queue processing, adds new messages at the end of the queue -func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler { - callback := func(result *commonpb.Payloads, err error) { - wf.canceller.discard(id) - - if err != nil { - wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) - return - } - - wf.mq.pushResponse(id, result) - err = wf.flushQueue() - if err != nil { - panic(err) - } - } - - return func(result *commonpb.Payloads, err error) { - callback(result, err) - } -} - -// Exchange messages between host and worker processes and add new commands to the queue. -func (wf *workflowProcess) flushQueue() error { - const op = errors.Op("flush queue") - messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) - wf.mq.flush() - - if err != nil { - return errors.E(op, err) - } - - wf.pipeline = append(wf.pipeline, messages...) - - return nil -} - -// Exchange messages between host and worker processes without adding new commands to the queue. -func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { - const op = errors.Op("discard queue") - messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) - wf.mq.flush() - - if err != nil { - return nil, errors.E(op, err) - } - - return messages, nil -} - -// Run single command and return single result. -func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { - const op = errors.Op("workflow_process_runcommand") - _, msg := wf.mq.allocateMessage(cmd, payloads) - - result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) - if err != nil { - return rrt.Message{}, errors.E(op, err) - } - - if len(result) != 1 { - return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) - } - - return result[0], nil -} diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go deleted file mode 100644 index b9ed46c8..00000000 --- a/plugins/temporal/workflow/workflow_pool.go +++ /dev/null @@ -1,190 +0,0 @@ -package workflow - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - rrWorker "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/temporal/client" - rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" -) - -const eventWorkerExit = 8390 - -// RR_MODE env variable key -const RR_MODE = "RR_MODE" //nolint - -// RR_CODEC env variable key -const RR_CODEC = "RR_CODEC" //nolint - -type workflowPool interface { - SeqID() uint64 - Exec(p payload.Payload) (payload.Payload, error) - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.BaseProcess - WorkflowNames() []string -} - -// PoolEvent triggered on workflow pool worker events. -type PoolEvent struct { - Event int - Context interface{} - Caused error -} - -// workflowPoolImpl manages workflowProcess executions between worker restarts. -type workflowPoolImpl struct { - codec rrt.Codec - seqID uint64 - workflows map[string]rrt.WorkflowInfo - tWorkers []worker.Worker - mu sync.Mutex - worker rrWorker.SyncWorker - active bool -} - -// newWorkflowPool creates new workflow pool. -func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { - const op = errors.Op("new_workflow_pool") - w, err := factory.NewWorker( - context.Background(), - map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}, - listener, - ) - if err != nil { - return nil, errors.E(op, err) - } - - go func() { - err := w.Wait() - listener(PoolEvent{Event: eventWorkerExit, Caused: err}) - }() - - return &workflowPoolImpl{codec: codec, worker: rrWorker.From(w)}, nil -} - -// Start the pool in non blocking mode. -func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("workflow_pool_start") - pool.mu.Lock() - pool.active = true - pool.mu.Unlock() - - err := pool.initWorkers(ctx, temporal) - if err != nil { - return errors.E(op, err) - } - - for i := 0; i < len(pool.tWorkers); i++ { - err := pool.tWorkers[i].Start() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// Active. -func (pool *workflowPoolImpl) Active() bool { - return pool.active -} - -// Destroy stops all temporal workers and application worker. -func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { - pool.mu.Lock() - defer pool.mu.Unlock() - const op = errors.Op("workflow_pool_destroy") - - pool.active = false - for i := 0; i < len(pool.tWorkers); i++ { - pool.tWorkers[i].Stop() - } - - worker.PurgeStickyWorkflowCache() - - err := pool.worker.Stop() - if err != nil { - return errors.E(op, err) - } - - return nil -} - -// NewWorkflowDefinition initiates new workflow process. -func (pool *workflowPoolImpl) NewWorkflowDefinition() bindings.WorkflowDefinition { - return &workflowProcess{ - codec: pool.codec, - pool: pool, - } -} - -// NewWorkflowDefinition initiates new workflow process. -func (pool *workflowPoolImpl) SeqID() uint64 { - return atomic.AddUint64(&pool.seqID, 1) -} - -// Exec set of commands in thread safe move. -func (pool *workflowPoolImpl) Exec(p payload.Payload) (payload.Payload, error) { - pool.mu.Lock() - defer pool.mu.Unlock() - - if !pool.active { - return payload.Payload{}, nil - } - - return pool.worker.Exec(p) -} - -func (pool *workflowPoolImpl) Workers() []rrWorker.BaseProcess { - return []rrWorker.BaseProcess{pool.worker} -} - -func (pool *workflowPoolImpl) WorkflowNames() []string { - names := make([]string, 0, len(pool.workflows)) - for name := range pool.workflows { - names = append(names, name) - } - - return names -} - -// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. -func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("workflow_pool_init_workers") - workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) - if err != nil { - return errors.E(op, err) - } - - pool.workflows = make(map[string]rrt.WorkflowInfo) - pool.tWorkers = make([]worker.Worker, 0) - - for _, info := range workerInfo { - w, err := temporal.CreateWorker(info.TaskQueue, info.Options) - if err != nil { - return errors.E(op, err, pool.Destroy(ctx)) - } - - pool.tWorkers = append(pool.tWorkers, w) - for _, workflowInfo := range info.Workflows { - w.RegisterWorkflowWithOptions(pool, workflow.RegisterOptions{ - Name: workflowInfo.Name, - DisableAlreadyRegisteredCheck: false, - }) - - pool.workflows[workflowInfo.Name] = workflowInfo - } - } - - return nil -} |