summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
commit4638bdca80f75bc120b330022086d31c8b41be5b (patch)
tree2362cdb39dc2e793f5bec7fd9b8d2363f516c1d4
parent7756eb25453c8006fbd75aa5c97159e96331b840 (diff)
Code cleanup
-rw-r--r--plugins/temporal/activity/activity_pool.go110
-rw-r--r--plugins/temporal/activity/plugin.go21
-rw-r--r--plugins/temporal/client/plugin.go34
-rw-r--r--plugins/temporal/protocol/json_codec.go77
-rw-r--r--plugins/temporal/protocol/message.go9
-rw-r--r--plugins/temporal/protocol/proto_codec.go5
-rw-r--r--plugins/temporal/protocol/worker_info.go2
-rw-r--r--plugins/temporal/workflow/message_queue.go20
-rw-r--r--plugins/temporal/workflow/message_queue_test.go4
-rw-r--r--plugins/temporal/workflow/plugin.go37
-rw-r--r--plugins/temporal/workflow/process.go33
-rw-r--r--plugins/temporal/workflow/workflow_pool.go61
-rw-r--r--tests/plugins/temporal/server_test.go7
13 files changed, 201 insertions, 219 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go
index 0aa2a62f..d09722ce 100644
--- a/plugins/temporal/activity/activity_pool.go
+++ b/plugins/temporal/activity/activity_pool.go
@@ -19,42 +19,40 @@ import (
"go.temporal.io/sdk/worker"
)
-type (
- activityPool interface {
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.SyncWorker
- ActivityNames() []string
- GetActivityContext(taskToken []byte) (context.Context, error)
- }
+// RR_MODE env variable
+const RR_MODE = "RR_MODE" //nolint:golint,stylecheck
+// RR_CODEC env variable
+const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck
+
+//
+const doNotCompleteOnReturn = "doNotCompleteOnReturn"
+
+type activityPool interface {
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.SyncWorker
+ ActivityNames() []string
+ GetActivityContext(taskToken []byte) (context.Context, error)
+}
- activityPoolImpl struct {
- dc converter.DataConverter
- codec rrt.Codec
- seqID uint64
- activities []string
- wp pool.Pool
- tWorkers []worker.Worker
- running sync.Map
- }
-)
+type activityPoolImpl struct {
+ dc converter.DataConverter
+ codec rrt.Codec
+ seqID uint64
+ activities []string
+ wp pool.Pool
+ tWorkers []worker.Worker
+ running sync.Map
+}
// newActivityPool
-func newActivityPool(
- codec rrt.Codec,
- listener events.Listener,
- poolConfig pool.Config,
- server server.Server,
-) (activityPool, error) {
- wp, err := server.NewWorkerPool(
- context.Background(),
- poolConfig,
- map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
- listener,
- )
-
+func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) {
+ const op = errors.Op("new_activity_pool")
+ // env variables
+ env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}
+ wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
return &activityPoolImpl{
@@ -66,17 +64,18 @@ func newActivityPool(
// initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("activity_pool_start")
pool.dc = temporal.GetDataConverter()
err := pool.initWorkers(ctx, temporal)
if err != nil {
- return err
+ return errors.E(op, err)
}
for i := 0; i < len(pool.tWorkers); i++ {
err := pool.tWorkers[i].Start()
if err != nil {
- return err
+ return errors.E(op, err)
}
}
@@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string {
// ActivityNames returns list of all available activity names.
func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) {
+ const op = errors.Op("activity_pool_get_activity_context")
c, ok := pool.running.Load(string(taskToken))
if !ok {
- return nil, errors.E("heartbeat on non running activity")
+ return nil, errors.E(op, errors.Str("heartbeat on non running activity"))
}
return c.(context.Context), nil
@@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
- const op = errors.Op("createTemporalWorker")
+ const op = errors.Op("activity_pool_create_temporal_worker")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter())
if err != nil {
@@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
pool.activities = make([]string, 0)
pool.tWorkers = make([]worker.Worker, 0)
- for _, info := range workerInfo {
- w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
+ for i := 0; i < len(workerInfo); i++ {
+ w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options)
if err != nil {
return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)
- for _, activityInfo := range info.Activities {
+ for j := 0; j < len(workerInfo[i].Activities); j++ {
w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{
- Name: activityInfo.Name,
+ Name: workerInfo[i].Activities[j].Name,
DisableAlreadyRegisteredCheck: false,
})
- pool.activities = append(pool.activities, activityInfo.Name)
+ pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name)
}
}
@@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T
// executes activity with underlying worker.
func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) {
- const op = errors.Op("executeActivity")
+ const op = errors.Op("activity_pool_execute_activity")
heartbeatDetails := &common.Payloads{}
if activity.HasHeartbeatDetails(ctx) {
@@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
}
- var (
- info = activity.GetInfo(ctx)
- msg = rrt.Message{
- ID: atomic.AddUint64(&pool.seqID, 1),
- Command: rrt.InvokeActivity{
- Name: info.ActivityType.Name,
- Info: info,
- HeartbeatDetails: len(heartbeatDetails.Payloads),
- },
- Payloads: args,
- }
- )
+ var info = activity.GetInfo(ctx)
+ var msg = rrt.Message{
+ ID: atomic.AddUint64(&pool.seqID, 1),
+ Command: rrt.InvokeActivity{
+ Name: info.ActivityType.Name,
+ Info: info,
+ HeartbeatDetails: len(heartbeatDetails.Payloads),
+ },
+ Payloads: args,
+ }
if len(heartbeatDetails.Payloads) != 0 {
msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...)
@@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.
}
if len(result) != 1 {
- return nil, errors.E(op, "invalid activity worker response")
+ return nil, errors.E(op, errors.Str("invalid activity worker response"))
}
out := result[0]
if out.Failure != nil {
- if out.Failure.Message == "doNotCompleteOnReturn" {
+ if out.Failure.Message == doNotCompleteOnReturn {
return nil, activity.ErrResultPending
}
diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go
index 02d66297..5e562a8d 100644
--- a/plugins/temporal/activity/plugin.go
+++ b/plugins/temporal/activity/plugin.go
@@ -39,9 +39,10 @@ type Plugin struct {
// Init configures activity service.
func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error {
+ const op = errors.Op("activity_plugin_init")
if temporal.GetConfig().Activities == nil {
// no need to serve activities
- return errors.E(errors.Disabled)
+ return errors.E(op, errors.Disabled)
}
p.temporal = temporal
@@ -55,11 +56,12 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger
// Serve activities with underlying workers.
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+ const op = errors.Op("activity_plugin_serve")
+ errCh := make(chan error, 1)
pool, err := p.startPool()
if err != nil {
- errCh <- errors.E("startPool", err)
+ errCh <- errors.E(op, err)
return errCh
}
@@ -83,7 +85,7 @@ func (p *Plugin) Serve() chan error {
err = backoff.Retry(p.replacePool, bkoff)
if err != nil {
- errCh <- errors.E("deadPool", err)
+ errCh <- errors.E(op, err)
}
}
}
@@ -95,11 +97,16 @@ func (p *Plugin) Serve() chan error {
// Stop stops the serving plugin.
func (p *Plugin) Stop() error {
atomic.StoreInt64(&p.closing, 1)
+ const op = errors.Op("activity_plugin_stop")
pool := p.getPool()
if pool != nil {
p.pool = nil
- return pool.Destroy(context.Background())
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
return nil
@@ -112,9 +119,7 @@ func (p *Plugin) Name() string {
// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
- client, _ := p.temporal.GetClient()
-
- return &rpc{srv: p, client: client}
+ return &rpc{srv: p, client: p.temporal.GetClient()}
}
// Workers returns pool workers.
diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go
index 37c8d7be..047a1815 100644
--- a/plugins/temporal/client/plugin.go
+++ b/plugins/temporal/client/plugin.go
@@ -24,7 +24,7 @@ var stickyCacheSet = false
// Plugin implement Temporal contract.
type Plugin struct {
workerID int32
- cfg Config
+ cfg *Config
dc converter.DataConverter
log logger.Logger
client client.Client
@@ -32,7 +32,7 @@ type Plugin struct {
// Temporal define common interface for RoadRunner plugins.
type Temporal interface {
- GetClient() (client.Client, error)
+ GetClient() client.Client
GetDataConverter() converter.DataConverter
GetConfig() Config
GetCodec() rrt.Codec
@@ -51,15 +51,27 @@ type Config struct {
// Init initiates temporal client plugin.
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("temporal_client_plugin_init")
p.log = log
p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter())
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ if p.cfg == nil {
+ return errors.E(op, errors.Disabled)
+ }
- return cfg.UnmarshalKey(PluginName, &p.cfg)
+ return nil
}
// GetConfig returns temporal configuration.
func (p *Plugin) GetConfig() Config {
- return p.cfg
+ if p.cfg != nil {
+ return *p.cfg
+ }
+ // empty
+ return Config{}
}
// GetCodec returns communication codec.
@@ -79,6 +91,7 @@ func (p *Plugin) GetDataConverter() converter.DataConverter {
// Serve starts temporal srv.
func (p *Plugin) Serve() chan error {
+ const op = errors.Op("temporal_client_plugin_serve")
errCh := make(chan error, 1)
var err error
@@ -94,12 +107,12 @@ func (p *Plugin) Serve() chan error {
DataConverter: p.dc,
})
- p.log.Debug("Connected to temporal server", "Plugin", p.cfg.Address)
-
if err != nil {
- errCh <- errors.E(errors.Op("p connect"), err)
+ errCh <- errors.E(op, err)
}
+ p.log.Debug("connected to temporal server", "address", p.cfg.Address)
+
return errCh
}
@@ -113,14 +126,15 @@ func (p *Plugin) Stop() error {
}
// GetClient returns active srv connection.
-func (p *Plugin) GetClient() (client.Client, error) {
- return p.client, nil
+func (p *Plugin) GetClient() client.Client {
+ return p.client
}
// CreateWorker allocates new temporal worker on an active connection.
func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) {
+ const op = errors.Op("temporal_client_plugin_create_worker")
if p.client == nil {
- return nil, errors.E("unable to create worker, invalid temporal p")
+ return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client"))
}
if options.Identity == "" {
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
index dae3a7d0..e7a77068 100644
--- a/plugins/temporal/protocol/json_codec.go
+++ b/plugins/temporal/protocol/json_codec.go
@@ -2,7 +2,7 @@ package protocol
import (
"github.com/fatih/color"
- jsoniter "github.com/json-iterator/go"
+ j "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -10,34 +10,34 @@ import (
"go.temporal.io/api/failure/v1"
)
-type (
- // JSONCodec can be used for debugging and log capturing reasons.
- JSONCodec struct {
- // level enables verbose logging or all incoming and outcoming messages.
- level DebugLevel
+var json = j.ConfigCompatibleWithStandardLibrary
- // logger renders messages when debug enabled.
- logger logger.Logger
- }
+// JSONCodec can be used for debugging and log capturing reasons.
+type JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
- // jsonFrame contains message command in binary form.
- jsonFrame struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+}
- // Command name. Optional.
- Command string `json:"command,omitempty"`
+// jsonFrame contains message command in binary form.
+type jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
- // Options to be unmarshalled to body (raw payload).
- Options jsoniter.RawMessage `json:"options,omitempty"`
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
- // Failure associated with command id.
- Failure []byte `json:"failure,omitempty"`
+ // Options to be unmarshalled to body (raw payload).
+ Options j.RawMessage `json:"options,omitempty"`
- // Payloads specific to the command or result.
- Payloads []byte `json:"payloads,omitempty"`
- }
-)
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+}
// NewJSONCodec creates new Json communication codec.
func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
@@ -62,21 +62,20 @@ func (c *JSONCodec) GetName() string {
// Execute exchanges commands with worker.
func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ const op = errors.Op("json_codec_execute")
if len(msg) == 0 {
return nil, nil
}
- var (
- response = make([]jsonFrame, 0, 5)
- result = make([]Message, 0, 5)
- err error
- )
+ var response = make([]jsonFrame, 0, 5)
+ var result = make([]Message, 0, 5)
+ var err error
frames := make([]jsonFrame, 0, len(msg))
for _, m := range msg {
frame, err := c.packFrame(m)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
frames = append(frames, frame)
@@ -88,14 +87,14 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
p.Context = []byte("null")
}
- p.Context, err = jsoniter.Marshal(ctx)
+ p.Context, err = json.Marshal(ctx)
if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
+ return nil, errors.E(op, err)
}
- p.Body, err = jsoniter.Marshal(frames)
+ p.Body, err = json.Marshal(frames)
if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
+ return nil, errors.E(op, err)
}
if c.level >= DebugNormal {
@@ -109,7 +108,7 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
out, err := e.Exec(p)
if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
+ return nil, errors.E(op, err)
}
if len(out.Body) == 0 {
@@ -126,15 +125,15 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
c.logger.Debug(logMessage, "receive", true)
}
- err = jsoniter.Unmarshal(out.Body, &response)
+ err = json.Unmarshal(out.Body, &response)
if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
+ return nil, errors.E(op, err)
}
for _, f := range response {
msg, err := c.parseFrame(f)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
result = append(result, msg)
@@ -174,7 +173,7 @@ func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
return jsonFrame{}, err
}
- frame.Options, err = jsoniter.Marshal(msg.Command)
+ frame.Options, err = json.Marshal(msg.Command)
if err != nil {
return jsonFrame{}, err
}
@@ -214,7 +213,7 @@ func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
return Message{}, err
}
- err = jsoniter.Unmarshal(frame.Options, &cmd)
+ err = json.Unmarshal(frame.Options, &cmd)
if err != nil {
return Message{}, err
}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
index 4568fd1d..d5e0f49d 100644
--- a/plugins/temporal/protocol/message.go
+++ b/plugins/temporal/protocol/message.go
@@ -39,8 +39,7 @@ const (
)
// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct {
-}
+type GetWorkerInfo struct{}
// InvokeActivity invokes activity.
type InvokeActivity struct {
@@ -218,6 +217,7 @@ func (cmd NewTimer) ToDuration() time.Duration {
// returns command name (only for the commands sent to the worker)
func commandName(cmd interface{}) (string, error) {
+ const op = errors.Op("command_name")
switch cmd.(type) {
case GetWorkerInfo, *GetWorkerInfo:
return getWorkerInfoCommand, nil
@@ -260,12 +260,13 @@ func commandName(cmd interface{}) (string, error) {
case Panic, *Panic:
return panicCommand, nil
default:
- return "", errors.E(errors.Op("commandName"), "undefined command type", cmd)
+ return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
}
}
// reads command from binary payload
func initCommand(name string) (interface{}, error) {
+ const op = errors.Op("init_command")
switch name {
case getWorkerInfoCommand:
return &GetWorkerInfo{}, nil
@@ -328,6 +329,6 @@ func initCommand(name string) (interface{}, error) {
return &Panic{}, nil
default:
- return nil, errors.E(errors.Op("initCommand"), "undefined command type", name)
+ return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
}
}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
index b41f02b6..607fe0fe 100644
--- a/plugins/temporal/protocol/proto_codec.go
+++ b/plugins/temporal/protocol/proto_codec.go
@@ -120,6 +120,7 @@ func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
}
func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
+ const op = errors.Op("proto_codec_parse_message")
var err error
msg := Message{
@@ -131,12 +132,12 @@ func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
if frame.Command != "" {
msg.Command, err = initCommand(frame.Command)
if err != nil {
- return Message{}, err
+ return Message{}, errors.E(op, err)
}
err = jsoniter.Unmarshal(frame.Options, &msg.Command)
if err != nil {
- return Message{}, err
+ return Message{}, errors.E(op, err)
}
}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
index 6dfcd81f..58a0ae66 100644
--- a/plugins/temporal/protocol/worker_info.go
+++ b/plugins/temporal/protocol/worker_info.go
@@ -47,7 +47,7 @@ func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerI
result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
if len(result) != 1 {
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
index 50949897..8f4409d1 100644
--- a/plugins/temporal/workflow/message_queue.go
+++ b/plugins/temporal/workflow/message_queue.go
@@ -22,28 +22,20 @@ func (mq *messageQueue) flush() {
mq.queue = mq.queue[0:0]
}
-func (mq *messageQueue) allocateMessage(
- cmd interface{},
- payloads *common.Payloads,
-) (id uint64, msg rrt.Message, err error) {
- msg = rrt.Message{
+func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
+ msg := rrt.Message{
ID: mq.seqID(),
Command: cmd,
Payloads: payloads,
}
- return msg.ID, msg, nil
+ return msg.ID, msg
}
-func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) (id uint64, err error) {
- id, msg, err := mq.allocateMessage(cmd, payloads)
- if err != nil {
- return 0, err
- }
-
+func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
+ id, msg := mq.allocateMessage(cmd, payloads)
mq.queue = append(mq.queue, msg)
-
- return id, nil
+ return id
}
func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
index 61f5123f..1fcd409f 100644
--- a/plugins/temporal/workflow/message_queue_test.go
+++ b/plugins/temporal/workflow/message_queue_test.go
@@ -44,10 +44,8 @@ func Test_MessageQueueCommandID(t *testing.T) {
return atomic.AddUint64(&index, 1)
})
- n, err := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
+ n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
assert.Equal(t, n, index)
-
- assert.NoError(t, err)
assert.Len(t, mq.queue, 1)
mq.flush()
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
index ee2d722c..3a397364 100644
--- a/plugins/temporal/workflow/plugin.go
+++ b/plugins/temporal/workflow/plugin.go
@@ -41,18 +41,19 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger
p.server = server
p.events = events.NewEventsHandler()
p.log = log
- p.reset = make(chan struct{})
+ p.reset = make(chan struct{}, 1)
return nil
}
// Serve starts workflow service.
func (p *Plugin) Serve() chan error {
+ const op = errors.Op("workflow_plugin_serve")
errCh := make(chan error, 1)
pool, err := p.startPool()
if err != nil {
- errCh <- errors.E("startPool", err)
+ errCh <- errors.E(op, err)
return errCh
}
@@ -76,7 +77,7 @@ func (p *Plugin) Serve() chan error {
err = backoff.Retry(p.replacePool, bkoff)
if err != nil {
- errCh <- errors.E("deadPool", err)
+ errCh <- errors.E(op, err)
}
}
}
@@ -87,12 +88,17 @@ func (p *Plugin) Serve() chan error {
// Stop workflow service.
func (p *Plugin) Stop() error {
+ const op = errors.Op("workflow_plugin_stop")
atomic.StoreInt64(&p.closing, 1)
pool := p.getPool()
if pool != nil {
p.pool = nil
- return pool.Destroy(context.Background())
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
return nil
@@ -121,11 +127,6 @@ func (p *Plugin) Reset() error {
}
// AddListener adds event listeners to the service.
-func (p *Plugin) AddListener(listener events.Listener) {
- p.events.AddListener(listener)
-}
-
-// AddListener adds event listeners to the service.
func (p *Plugin) poolListener(event interface{}) {
if ev, ok := event.(PoolEvent); ok {
if ev.Event == eventWorkerExit {
@@ -141,18 +142,19 @@ func (p *Plugin) poolListener(event interface{}) {
// AddListener adds event listeners to the service.
func (p *Plugin) startPool() (workflowPool, error) {
+ const op = errors.Op("workflow_plugin_start_pool")
pool, err := newWorkflowPool(
p.temporal.GetCodec().WithLogger(p.log),
p.poolListener,
p.server,
)
if err != nil {
- return nil, errors.E(errors.Op("initWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
err = pool.Start(context.Background(), p.temporal)
if err != nil {
- return nil, errors.E(errors.Op("startWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
@@ -162,29 +164,30 @@ func (p *Plugin) startPool() (workflowPool, error) {
func (p *Plugin) replacePool() error {
p.mu.Lock()
+ const op = errors.Op("workflow_plugin_replace_pool")
defer p.mu.Unlock()
if p.pool != nil {
- errD := p.pool.Destroy(context.Background())
+ err := p.pool.Destroy(context.Background())
p.pool = nil
- if errD != nil {
+ if err != nil {
p.log.Error(
"Unable to destroy expired workflow pool",
"error",
- errors.E(errors.Op("destroyWorkflowPool"), errD),
+ errors.E(op, err),
)
+ return errors.E(op, err)
}
}
pool, err := p.startPool()
if err != nil {
p.log.Error("Replace workflow pool failed", "error", err)
- return errors.E(errors.Op("newWorkflowPool"), err)
+ return errors.E(op, err)
}
- p.log.Debug("Replace workflow pool")
-
p.pool = pool
+ p.log.Debug("workflow pool successfully replaced")
return nil
}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
index ec5a14eb..45e6885c 100644
--- a/plugins/temporal/workflow/process.go
+++ b/plugins/temporal/workflow/process.go
@@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com
lastCompletionOffset = len(lastCompletion.Payloads)
}
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
@@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() {
// TODO: properly handle errors
// panic(err)
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
- if err != nil {
- panic(err)
- }
-
_, _ = wf.discardQueue()
}
@@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context {
// schedule cancel command
func (wf *workflowProcess) handleCancel() {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
-
- if err != nil {
- panic(err)
- }
}
// schedule the signal processing
func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.InvokeSignal{
RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
Name: name,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// Handle query in blocking mode.
@@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
// Run single command and return single result.
func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("run command")
- _, msg, err := wf.mq.allocateMessage(cmd, payloads)
- if err != nil {
- return rrt.Message{}, err
- }
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
if err != nil {
@@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa
}
if len(result) != 1 {
- return rrt.Message{}, errors.E("unexpected worker response")
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
}
return result[0], nil
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
index 2022b624..1a78f377 100644
--- a/plugins/temporal/workflow/workflow_pool.go
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -19,41 +19,35 @@ import (
const eventWorkerExit = 8390
-type (
- workflowPool interface {
- SeqID() uint64
- Exec(p payload.Payload) (payload.Payload, error)
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.BaseProcess
- WorkflowNames() []string
- }
+type workflowPool interface {
+ SeqID() uint64
+ Exec(p payload.Payload) (payload.Payload, error)
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.BaseProcess
+ WorkflowNames() []string
+}
- // PoolEvent triggered on workflow pool worker events.
- PoolEvent struct {
- Event int
- Context interface{}
- Caused error
- }
+// PoolEvent triggered on workflow pool worker events.
+type PoolEvent struct {
+ Event int
+ Context interface{}
+ Caused error
+}
- // workflowPoolImpl manages workflowProcess executions between worker restarts.
- workflowPoolImpl struct {
- codec rrt.Codec
- seqID uint64
- workflows map[string]rrt.WorkflowInfo
- tWorkers []worker.Worker
- mu sync.Mutex
- worker rrWorker.SyncWorker
- active bool
- }
-)
+// workflowPoolImpl manages workflowProcess executions between worker restarts.
+type workflowPoolImpl struct {
+ codec rrt.Codec
+ seqID uint64
+ workflows map[string]rrt.WorkflowInfo
+ tWorkers []worker.Worker
+ mu sync.Mutex
+ worker rrWorker.SyncWorker
+ active bool
+}
// newWorkflowPool creates new workflow pool.
-func newWorkflowPool(
- codec rrt.Codec,
- listener events.Listener,
- factory server.Server,
-) (workflowPool, error) {
+func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
w, err := factory.NewWorker(
context.Background(),
map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
@@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string {
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_init_workers")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
if err != nil {
- return err
+ return errors.E(op, err)
}
pool.workflows = make(map[string]rrt.WorkflowInfo)
@@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T
for _, info := range workerInfo {
w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
if err != nil {
- return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx))
+ return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)
diff --git a/tests/plugins/temporal/server_test.go b/tests/plugins/temporal/server_test.go
index 03e4ed7f..c8d815c3 100644
--- a/tests/plugins/temporal/server_test.go
+++ b/tests/plugins/temporal/server_test.go
@@ -98,12 +98,7 @@ func NewTestServer(opt ...ConfigOption) *TestServer {
}
func (s *TestServer) Client() temporalClient.Client {
- c, err := s.temporal.GetClient()
- if err != nil {
- panic(err)
- }
-
- return c
+ return s.temporal.GetClient()
}
func (s *TestServer) MustClose() {