diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 01:06:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-26 01:06:16 +0300 |
commit | 4638bdca80f75bc120b330022086d31c8b41be5b (patch) | |
tree | 2362cdb39dc2e793f5bec7fd9b8d2363f516c1d4 /plugins/temporal/activity/activity_pool.go | |
parent | 7756eb25453c8006fbd75aa5c97159e96331b840 (diff) |
Code cleanup
Diffstat (limited to 'plugins/temporal/activity/activity_pool.go')
-rw-r--r-- | plugins/temporal/activity/activity_pool.go | 110 |
1 files changed, 54 insertions, 56 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go index 0aa2a62f..d09722ce 100644 --- a/plugins/temporal/activity/activity_pool.go +++ b/plugins/temporal/activity/activity_pool.go @@ -19,42 +19,40 @@ import ( "go.temporal.io/sdk/worker" ) -type ( - activityPool interface { - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.SyncWorker - ActivityNames() []string - GetActivityContext(taskToken []byte) (context.Context, error) - } +// RR_MODE env variable +const RR_MODE = "RR_MODE" //nolint:golint,stylecheck +// RR_CODEC env variable +const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck + +// +const doNotCompleteOnReturn = "doNotCompleteOnReturn" + +type activityPool interface { + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.SyncWorker + ActivityNames() []string + GetActivityContext(taskToken []byte) (context.Context, error) +} - activityPoolImpl struct { - dc converter.DataConverter - codec rrt.Codec - seqID uint64 - activities []string - wp pool.Pool - tWorkers []worker.Worker - running sync.Map - } -) +type activityPoolImpl struct { + dc converter.DataConverter + codec rrt.Codec + seqID uint64 + activities []string + wp pool.Pool + tWorkers []worker.Worker + running sync.Map +} // newActivityPool -func newActivityPool( - codec rrt.Codec, - listener events.Listener, - poolConfig pool.Config, - server server.Server, -) (activityPool, error) { - wp, err := server.NewWorkerPool( - context.Background(), - poolConfig, - map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, - listener, - ) - +func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { + const op = errors.Op("new_activity_pool") + // env variables + env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} + wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) if err != nil { - return nil, err + return nil, errors.E(op, err) } return &activityPoolImpl{ @@ -66,17 +64,18 @@ func newActivityPool( // initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("activity_pool_start") pool.dc = temporal.GetDataConverter() err := pool.initWorkers(ctx, temporal) if err != nil { - return err + return errors.E(op, err) } for i := 0; i < len(pool.tWorkers); i++ { err := pool.tWorkers[i].Start() if err != nil { - return err + return errors.E(op, err) } } @@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string { // ActivityNames returns list of all available activity names. func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { + const op = errors.Op("activity_pool_get_activity_context") c, ok := pool.running.Load(string(taskToken)) if !ok { - return nil, errors.E("heartbeat on non running activity") + return nil, errors.E(op, errors.Str("heartbeat on non running activity")) } return c.(context.Context), nil @@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("createTemporalWorker") + const op = errors.Op("activity_pool_create_temporal_worker") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) if err != nil { @@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T pool.activities = make([]string, 0) pool.tWorkers = make([]worker.Worker, 0) - for _, info := range workerInfo { - w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + for i := 0; i < len(workerInfo); i++ { + w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) if err != nil { return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) - for _, activityInfo := range info.Activities { + for j := 0; j < len(workerInfo[i].Activities); j++ { w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ - Name: activityInfo.Name, + Name: workerInfo[i].Activities[j].Name, DisableAlreadyRegisteredCheck: false, }) - pool.activities = append(pool.activities, activityInfo.Name) + pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) } } @@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T // executes activity with underlying worker. func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { - const op = errors.Op("executeActivity") + const op = errors.Op("activity_pool_execute_activity") heartbeatDetails := &common.Payloads{} if activity.HasHeartbeatDetails(ctx) { @@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } } - var ( - info = activity.GetInfo(ctx) - msg = rrt.Message{ - ID: atomic.AddUint64(&pool.seqID, 1), - Command: rrt.InvokeActivity{ - Name: info.ActivityType.Name, - Info: info, - HeartbeatDetails: len(heartbeatDetails.Payloads), - }, - Payloads: args, - } - ) + var info = activity.GetInfo(ctx) + var msg = rrt.Message{ + ID: atomic.AddUint64(&pool.seqID, 1), + Command: rrt.InvokeActivity{ + Name: info.ActivityType.Name, + Info: info, + HeartbeatDetails: len(heartbeatDetails.Payloads), + }, + Payloads: args, + } if len(heartbeatDetails.Payloads) != 0 { msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) @@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } if len(result) != 1 { - return nil, errors.E(op, "invalid activity worker response") + return nil, errors.E(op, errors.Str("invalid activity worker response")) } out := result[0] if out.Failure != nil { - if out.Failure.Message == "doNotCompleteOnReturn" { + if out.Failure.Message == doNotCompleteOnReturn { return nil, activity.ErrResultPending } |