summaryrefslogtreecommitdiff
path: root/plugins/temporal/activity/activity_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/activity/activity_pool.go')
-rw-r--r--plugins/temporal/activity/activity_pool.go110
1 files changed, 54 insertions, 56 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
index 0aa2a62f..d09722ce 100644
--- a/plugins/temporal/activity/activity_pool.go
+++ b/plugins/temporal/activity/activity_pool.go
@@ -19,42 +19,40 @@ import (
"go.temporal.io/sdk/worker"
)
-type (
- activityPool interface {
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.SyncWorker
- ActivityNames() []string
- GetActivityContext(taskToken []byte) (context.Context, error)
- }
+// RR_MODE env variable
+const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
+// RR_CODEC env variable
+const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
+
+//
+const doNotCompleteOnReturn = "doNotCompleteOnReturn"
+
+type activityPool interface {
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.SyncWorker
+ ActivityNames() []string
+ GetActivityContext(taskToken []byte) (context.Context, error)
+}
- activityPoolImpl struct {
- dc converter.DataConverter
- codec rrt.Codec
- seqID uint64
- activities []string
- wp pool.Pool
- tWorkers []worker.Worker
- running sync.Map
- }
-)
+type activityPoolImpl struct {
+ dc converter.DataConverter
+ codec rrt.Codec
+ seqID uint64
+ activities []string
+ wp pool.Pool
+ tWorkers []worker.Worker
+ running sync.Map
+}
// newActivityPool
-func newActivityPool(
- codec rrt.Codec,
- listener events.Listener,
- poolConfig pool.Config,
- server server.Server,
-) (activityPool, error) {
- wp, err := server.NewWorkerPool(
- context.Background(),
- poolConfig,
- map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
- listener,
- )
-
+func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
+ const op = errors.Op("new_activity_pool")
+ // env variables
+ env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
+ wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
return &activityPoolImpl{
@@ -66,17 +64,18 @@ func newActivityPool(
// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_start")
pool.dc = temporal.GetDataConverter()
err := pool.initWorkers(ctx, temporal)
if err != nil {
- return err
+ return errors.E(op, err)
}
for i := 0; i < len(pool.tWorkers); i++ {
err := pool.tWorkers[i].Start()
if err != nil {
- return err
+ return errors.E(op, err)
}
}
@@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string {
// ActivityNames returns list of all available activity names.
func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
+ const op = errors.Op("activity_pool_get_activity_context")
c, ok := pool.running.Load(string(taskToken))
if !ok {
- return nil, errors.E("heartbeat on non running activity")
+ return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
}
return c.(context.Context), nil
@@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("createTemporalWorker")
+ const op = errors.Op("activity_pool_create_temporal_worker")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
if err != nil {
@@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
pool.activities = make([]string, 0)
pool.tWorkers = make([]worker.Worker, 0)
- for _, info := range workerInfo {
- w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
+ for i := 0; i < len(workerInfo); i++ {
+ w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
if err != nil {
return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)
- for _, activityInfo := range info.Activities {
+ for j := 0; j < len(workerInfo[i].Activities); j++ {
w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
- Name: activityInfo.Name,
+ Name: workerInfo[i].Activities[j].Name,
DisableAlreadyRegisteredCheck: false,
})
- pool.activities = append(pool.activities, activityInfo.Name)
+ pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
}
}
@@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
// executes activity with underlying worker.
func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
- const op = errors.Op("executeActivity")
+ const op = errors.Op("activity_pool_execute_activity")
heartbeatDetails := &common.Payloads{}
if activity.HasHeartbeatDetails(ctx) {
@@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
}
- var (
- info = activity.GetInfo(ctx)
- msg = rrt.Message{
- ID: atomic.AddUint64(&pool.seqID, 1),
- Command: rrt.InvokeActivity{
- Name: info.ActivityType.Name,
- Info: info,
- HeartbeatDetails: len(heartbeatDetails.Payloads),
- },
- Payloads: args,
- }
- )
+ var info = activity.GetInfo(ctx)
+ var msg = rrt.Message{
+ ID: atomic.AddUint64(&pool.seqID, 1),
+ Command: rrt.InvokeActivity{
+ Name: info.ActivityType.Name,
+ Info: info,
+ HeartbeatDetails: len(heartbeatDetails.Payloads),
+ },
+ Payloads: args,
+ }
if len(heartbeatDetails.Payloads) != 0 {
msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
@@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
if len(result) != 1 {
- return nil, errors.E(op, "invalid activity worker response")
+ return nil, errors.E(op, errors.Str("invalid activity worker response"))
}
out := result[0]
if out.Failure != nil {
- if out.Failure.Message == "doNotCompleteOnReturn" {
+ if out.Failure.Message == doNotCompleteOnReturn {
return nil, activity.ErrResultPending
}