1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
package activity
import (
"context"
"sync"
"sync/atomic"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
rrWorker "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/temporal/client"
rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
"go.temporal.io/api/common/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internalbindings"
"go.temporal.io/sdk/worker"
)
type (
activityPool interface {
Start(ctx context.Context, temporal client.Temporal) error
Destroy(ctx context.Context) error
Workers() []rrWorker.SyncWorker
ActivityNames() []string
GetActivityContext(taskToken []byte) (context.Context, error)
}
activityPoolImpl struct {
dc converter.DataConverter
codec rrt.Codec
seqID uint64
activities []string
wp pool.Pool
tWorkers []worker.Worker
running sync.Map
}
)
// newActivityPool
func newActivityPool(
codec rrt.Codec,
listener events.Listener,
poolConfig pool.Config,
server server.Server,
) (activityPool, error) {
wp, err := server.NewWorkerPool(
context.Background(),
poolConfig,
map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
listener,
)
if err != nil {
return nil, err
}
return &activityPoolImpl{
codec: codec,
wp: wp,
running: sync.Map{},
}, nil
}
// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
pool.dc = temporal.GetDataConverter()
err := pool.initWorkers(ctx, temporal)
if err != nil {
return err
}
for i := 0; i < len(pool.tWorkers); i++ {
err := pool.tWorkers[i].Start()
if err != nil {
return err
}
}
return nil
}
// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) Destroy(ctx context.Context) error {
for i := 0; i < len(pool.tWorkers); i++ {
pool.tWorkers[i].Stop()
}
pool.wp.Destroy(ctx)
return nil
}
// Workers returns list of all allocated workers.
func (pool *activityPoolImpl) Workers() []rrWorker.SyncWorker {
return pool.wp.Workers()
}
// ActivityNames returns list of all available activity names.
func (pool *activityPoolImpl) ActivityNames() []string {
return pool.activities
}
// ActivityNames returns list of all available activity names.
func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
c, ok := pool.running.Load(string(taskToken))
if !ok {
return nil, errors.E("heartbeat on non running activity")
}
return c.(context.Context), nil
}
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
const op = errors.Op("createTemporalWorker")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
if err != nil {
return errors.E(op, err)
}
pool.activities = make([]string, 0)
pool.tWorkers = make([]worker.Worker, 0)
for _, info := range workerInfo {
w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
if err != nil {
return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)
for _, activityInfo := range info.Activities {
w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
Name: activityInfo.Name,
DisableAlreadyRegisteredCheck: false,
})
pool.activities = append(pool.activities, activityInfo.Name)
}
}
return nil
}
// executes activity with underlying worker.
func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
const op = errors.Op("executeActivity")
heartbeatDetails := &common.Payloads{}
if activity.HasHeartbeatDetails(ctx) {
err := activity.GetHeartbeatDetails(ctx, &heartbeatDetails)
if err != nil {
return nil, errors.E(op, err)
}
}
var (
info = activity.GetInfo(ctx)
msg = rrt.Message{
ID: atomic.AddUint64(&pool.seqID, 1),
Command: rrt.InvokeActivity{
Name: info.ActivityType.Name,
Info: info,
HeartbeatDetails: len(heartbeatDetails.Payloads),
},
Payloads: args,
}
)
if len(heartbeatDetails.Payloads) != 0 {
msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
}
pool.running.Store(string(info.TaskToken), ctx)
defer pool.running.Delete(string(info.TaskToken))
result, err := pool.codec.Execute(pool.wp, rrt.Context{TaskQueue: info.TaskQueue}, msg)
if err != nil {
return nil, errors.E(op, err)
}
if len(result) != 1 {
return nil, errors.E(op, "invalid activity worker response")
}
out := result[0]
if out.Failure != nil {
if out.Failure.Message == "doNotCompleteOnReturn" {
return nil, activity.ErrResultPending
}
return nil, internalbindings.ConvertFailureToError(out.Failure, pool.dc)
}
return out.Payloads, nil
}
|