diff options
Diffstat (limited to 'plugins/temporal/activity')
-rw-r--r-- | plugins/temporal/activity/activity_pool.go | 110 | ||||
-rw-r--r-- | plugins/temporal/activity/plugin.go | 21 |
2 files changed, 67 insertions, 64 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go index 0aa2a62f..d09722ce 100644 --- a/plugins/temporal/activity/activity_pool.go +++ b/plugins/temporal/activity/activity_pool.go @@ -19,42 +19,40 @@ import ( "go.temporal.io/sdk/worker" ) -type ( - activityPool interface { - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.SyncWorker - ActivityNames() []string - GetActivityContext(taskToken []byte) (context.Context, error) - } +// RR_MODE env variable +const RR_MODE = "RR_MODE" //nolint:golint,stylecheck +// RR_CODEC env variable +const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck + +// +const doNotCompleteOnReturn = "doNotCompleteOnReturn" + +type activityPool interface { + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.SyncWorker + ActivityNames() []string + GetActivityContext(taskToken []byte) (context.Context, error) +} - activityPoolImpl struct { - dc converter.DataConverter - codec rrt.Codec - seqID uint64 - activities []string - wp pool.Pool - tWorkers []worker.Worker - running sync.Map - } -) +type activityPoolImpl struct { + dc converter.DataConverter + codec rrt.Codec + seqID uint64 + activities []string + wp pool.Pool + tWorkers []worker.Worker + running sync.Map +} // newActivityPool -func newActivityPool( - codec rrt.Codec, - listener events.Listener, - poolConfig pool.Config, - server server.Server, -) (activityPool, error) { - wp, err := server.NewWorkerPool( - context.Background(), - poolConfig, - map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, - listener, - ) - +func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { + const op = errors.Op("new_activity_pool") + // env variables + env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} + wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) if err != nil { - return nil, err + return nil, errors.E(op, err) } return &activityPoolImpl{ @@ -66,17 +64,18 @@ func newActivityPool( // initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("activity_pool_start") pool.dc = temporal.GetDataConverter() err := pool.initWorkers(ctx, temporal) if err != nil { - return err + return errors.E(op, err) } for i := 0; i < len(pool.tWorkers); i++ { err := pool.tWorkers[i].Start() if err != nil { - return err + return errors.E(op, err) } } @@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string { // ActivityNames returns list of all available activity names. func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { + const op = errors.Op("activity_pool_get_activity_context") c, ok := pool.running.Load(string(taskToken)) if !ok { - return nil, errors.E("heartbeat on non running activity") + return nil, errors.E(op, errors.Str("heartbeat on non running activity")) } return c.(context.Context), nil @@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("createTemporalWorker") + const op = errors.Op("activity_pool_create_temporal_worker") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) if err != nil { @@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T pool.activities = make([]string, 0) pool.tWorkers = make([]worker.Worker, 0) - for _, info := range workerInfo { - w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + for i := 0; i < len(workerInfo); i++ { + w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) if err != nil { return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) - for _, activityInfo := range info.Activities { + for j := 0; j < len(workerInfo[i].Activities); j++ { w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ - Name: activityInfo.Name, + Name: workerInfo[i].Activities[j].Name, DisableAlreadyRegisteredCheck: false, }) - pool.activities = append(pool.activities, activityInfo.Name) + pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) } } @@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T // executes activity with underlying worker. func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { - const op = errors.Op("executeActivity") + const op = errors.Op("activity_pool_execute_activity") heartbeatDetails := &common.Payloads{} if activity.HasHeartbeatDetails(ctx) { @@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } } - var ( - info = activity.GetInfo(ctx) - msg = rrt.Message{ - ID: atomic.AddUint64(&pool.seqID, 1), - Command: rrt.InvokeActivity{ - Name: info.ActivityType.Name, - Info: info, - HeartbeatDetails: len(heartbeatDetails.Payloads), - }, - Payloads: args, - } - ) + var info = activity.GetInfo(ctx) + var msg = rrt.Message{ + ID: atomic.AddUint64(&pool.seqID, 1), + Command: rrt.InvokeActivity{ + Name: info.ActivityType.Name, + Info: info, + HeartbeatDetails: len(heartbeatDetails.Payloads), + }, + Payloads: args, + } if len(heartbeatDetails.Payloads) != 0 { msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) @@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } if len(result) != 1 { - return nil, errors.E(op, "invalid activity worker response") + return nil, errors.E(op, errors.Str("invalid activity worker response")) } out := result[0] if out.Failure != nil { - if out.Failure.Message == "doNotCompleteOnReturn" { + if out.Failure.Message == doNotCompleteOnReturn { return nil, activity.ErrResultPending } diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go index 02d66297..5e562a8d 100644 --- a/plugins/temporal/activity/plugin.go +++ b/plugins/temporal/activity/plugin.go @@ -39,9 +39,10 @@ type Plugin struct { // Init configures activity service. func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + const op = errors.Op("activity_plugin_init") if temporal.GetConfig().Activities == nil { // no need to serve activities - return errors.E(errors.Disabled) + return errors.E(op, errors.Disabled) } p.temporal = temporal @@ -55,11 +56,12 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger // Serve activities with underlying workers. func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) + const op = errors.Op("activity_plugin_serve") + errCh := make(chan error, 1) pool, err := p.startPool() if err != nil { - errCh <- errors.E("startPool", err) + errCh <- errors.E(op, err) return errCh } @@ -83,7 +85,7 @@ func (p *Plugin) Serve() chan error { err = backoff.Retry(p.replacePool, bkoff) if err != nil { - errCh <- errors.E("deadPool", err) + errCh <- errors.E(op, err) } } } @@ -95,11 +97,16 @@ func (p *Plugin) Serve() chan error { // Stop stops the serving plugin. func (p *Plugin) Stop() error { atomic.StoreInt64(&p.closing, 1) + const op = errors.Op("activity_plugin_stop") pool := p.getPool() if pool != nil { p.pool = nil - return pool.Destroy(context.Background()) + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil } return nil @@ -112,9 +119,7 @@ func (p *Plugin) Name() string { // RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { - client, _ := p.temporal.GetClient() - - return &rpc{srv: p, client: client} + return &rpc{srv: p, client: p.temporal.GetClient()} } // Workers returns pool workers. |