summaryrefslogtreecommitdiff
path: root/plugins/temporal/activity/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/activity/rpc.go')
-rw-r--r--plugins/temporal/activity/rpc.go66
1 files changed, 66 insertions, 0 deletions
diff --git a/plugins/temporal/activity/rpc.go b/plugins/temporal/activity/rpc.go
new file mode 100644
index 00000000..49efcd4f
--- /dev/null
+++ b/plugins/temporal/activity/rpc.go
@@ -0,0 +1,66 @@
+package activity
+
+import (
+ v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ "go.temporal.io/sdk/client"
+ "google.golang.org/protobuf/proto"
+)
+
+/*
+- the method's type is exported.
+- the method is exported.
+- the method has two arguments, both exported (or builtin) types.
+- the method's second argument is a pointer.
+- the method has return type error.
+*/
+type rpc struct {
+ srv *Plugin
+ client client.Client
+}
+
+// RecordHeartbeatRequest sent by activity to record current state.
+type RecordHeartbeatRequest struct {
+ TaskToken []byte `json:"taskToken"`
+ Details []byte `json:"details"`
+}
+
+// RecordHeartbeatResponse sent back to the worker to indicate that activity was cancelled.
+type RecordHeartbeatResponse struct {
+ Canceled bool `json:"canceled"`
+}
+
+// RecordActivityHeartbeat records heartbeat for an activity.
+// taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity.
+// details - is the progress you want to record along with heart beat for this activity.
+// The errors it can return:
+// - EntityNotExistsError
+// - InternalServiceError
+// - CanceledError
+func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHeartbeatResponse) error {
+ details := &commonpb.Payloads{}
+
+ if len(in.Details) != 0 {
+ if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
+ return err
+ }
+ }
+
+ // find running activity
+ ctx, err := r.srv.getPool().GetActivityContext(in.TaskToken)
+ if err != nil {
+ return err
+ }
+
+ activity.RecordHeartbeat(ctx, details)
+
+ select {
+ case <-ctx.Done():
+ *out = RecordHeartbeatResponse{Canceled: true}
+ default:
+ *out = RecordHeartbeatResponse{Canceled: false}
+ }
+
+ return nil
+}