summaryrefslogtreecommitdiff
path: root/plugins/temporal/activity
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/activity')
-rw-r--r--plugins/temporal/activity/activity_pool.go110
-rw-r--r--plugins/temporal/activity/plugin.go21
2 files changed, 67 insertions, 64 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
index 0aa2a62f..d09722ce 100644
--- a/plugins/temporal/activity/activity_pool.go
+++ b/plugins/temporal/activity/activity_pool.go
@@ -19,42 +19,40 @@ import (
"go.temporal.io/sdk/worker"
)
-type (
- activityPool interface {
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.SyncWorker
- ActivityNames() []string
- GetActivityContext(taskToken []byte) (context.Context, error)
- }
+// RR_MODE env variable
+const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
+// RR_CODEC env variable
+const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
+
+//
+const doNotCompleteOnReturn = "doNotCompleteOnReturn"
+
+type activityPool interface {
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.SyncWorker
+ ActivityNames() []string
+ GetActivityContext(taskToken []byte) (context.Context, error)
+}
- activityPoolImpl struct {
- dc converter.DataConverter
- codec rrt.Codec
- seqID uint64
- activities []string
- wp pool.Pool
- tWorkers []worker.Worker
- running sync.Map
- }
-)
+type activityPoolImpl struct {
+ dc converter.DataConverter
+ codec rrt.Codec
+ seqID uint64
+ activities []string
+ wp pool.Pool
+ tWorkers []worker.Worker
+ running sync.Map
+}
// newActivityPool
-func newActivityPool(
- codec rrt.Codec,
- listener events.Listener,
- poolConfig pool.Config,
- server server.Server,
-) (activityPool, error) {
- wp, err := server.NewWorkerPool(
- context.Background(),
- poolConfig,
- map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
- listener,
- )
-
+func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
+ const op = errors.Op("new_activity_pool")
+ // env variables
+ env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
+ wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
return &activityPoolImpl{
@@ -66,17 +64,18 @@ func newActivityPool(
// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_start")
pool.dc = temporal.GetDataConverter()
err := pool.initWorkers(ctx, temporal)
if err != nil {
- return err
+ return errors.E(op, err)
}
for i := 0; i < len(pool.tWorkers); i++ {
err := pool.tWorkers[i].Start()
if err != nil {
- return err
+ return errors.E(op, err)
}
}
@@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string {
// ActivityNames returns list of all available activity names.
func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
+ const op = errors.Op("activity_pool_get_activity_context")
c, ok := pool.running.Load(string(taskToken))
if !ok {
- return nil, errors.E("heartbeat on non running activity")
+ return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
}
return c.(context.Context), nil
@@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("createTemporalWorker")
+ const op = errors.Op("activity_pool_create_temporal_worker")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
if err != nil {
@@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
pool.activities = make([]string, 0)
pool.tWorkers = make([]worker.Worker, 0)
- for _, info := range workerInfo {
- w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
+ for i := 0; i < len(workerInfo); i++ {
+ w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
if err != nil {
return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)
- for _, activityInfo := range info.Activities {
+ for j := 0; j < len(workerInfo[i].Activities); j++ {
w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
- Name: activityInfo.Name,
+ Name: workerInfo[i].Activities[j].Name,
DisableAlreadyRegisteredCheck: false,
})
- pool.activities = append(pool.activities, activityInfo.Name)
+ pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
}
}
@@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
// executes activity with underlying worker.
func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
- const op = errors.Op("executeActivity")
+ const op = errors.Op("activity_pool_execute_activity")
heartbeatDetails := &common.Payloads{}
if activity.HasHeartbeatDetails(ctx) {
@@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
}
- var (
- info = activity.GetInfo(ctx)
- msg = rrt.Message{
- ID: atomic.AddUint64(&pool.seqID, 1),
- Command: rrt.InvokeActivity{
- Name: info.ActivityType.Name,
- Info: info,
- HeartbeatDetails: len(heartbeatDetails.Payloads),
- },
- Payloads: args,
- }
- )
+ var info = activity.GetInfo(ctx)
+ var msg = rrt.Message{
+ ID: atomic.AddUint64(&pool.seqID, 1),
+ Command: rrt.InvokeActivity{
+ Name: info.ActivityType.Name,
+ Info: info,
+ HeartbeatDetails: len(heartbeatDetails.Payloads),
+ },
+ Payloads: args,
+ }
if len(heartbeatDetails.Payloads) != 0 {
msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
@@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
if len(result) != 1 {
- return nil, errors.E(op, "invalid activity worker response")
+ return nil, errors.E(op, errors.Str("invalid activity worker response"))
}
out := result[0]
if out.Failure != nil {
- if out.Failure.Message == "doNotCompleteOnReturn" {
+ if out.Failure.Message == doNotCompleteOnReturn {
return nil, activity.ErrResultPending
}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
index 02d66297..5e562a8d 100644
--- a/plugins/temporal/activity/plugin.go
+++ b/plugins/temporal/activity/plugin.go
@@ -39,9 +39,10 @@ type Plugin struct {
// Init configures activity service.
func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ const op = errors.Op("activity_plugin_init")
if temporal.GetConfig().Activities == nil {
// no need to serve activities
- return errors.E(errors.Disabled)
+ return errors.E(op, errors.Disabled)
}
p.temporal = temporal
@@ -55,11 +56,12 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger
// Serve activities with underlying workers.
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+ const op = errors.Op("activity_plugin_serve")
+ errCh := make(chan error, 1)
pool, err := p.startPool()
if err != nil {
- errCh <- errors.E("startPool", err)
+ errCh <- errors.E(op, err)
return errCh
}
@@ -83,7 +85,7 @@ func (p *Plugin) Serve() chan error {
err = backoff.Retry(p.replacePool, bkoff)
if err != nil {
- errCh <- errors.E("deadPool", err)
+ errCh <- errors.E(op, err)
}
}
}
@@ -95,11 +97,16 @@ func (p *Plugin) Serve() chan error {
// Stop stops the serving plugin.
func (p *Plugin) Stop() error {
atomic.StoreInt64(&p.closing, 1)
+ const op = errors.Op("activity_plugin_stop")
pool := p.getPool()
if pool != nil {
p.pool = nil
- return pool.Destroy(context.Background())
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
return nil
@@ -112,9 +119,7 @@ func (p *Plugin) Name() string {
// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
- client, _ := p.temporal.GetClient()
-
- return &rpc{srv: p, client: client}
+ return &rpc{srv: p, client: p.temporal.GetClient()}
}
// Workers returns pool workers.