diff options
Diffstat (limited to 'plugins/temporal')
-rw-r--r-- | plugins/temporal/activity/activity_pool.go | 110 | ||||
-rw-r--r-- | plugins/temporal/activity/plugin.go | 21 | ||||
-rw-r--r-- | plugins/temporal/client/plugin.go | 34 | ||||
-rw-r--r-- | plugins/temporal/protocol/json_codec.go | 77 | ||||
-rw-r--r-- | plugins/temporal/protocol/message.go | 9 | ||||
-rw-r--r-- | plugins/temporal/protocol/proto_codec.go | 5 | ||||
-rw-r--r-- | plugins/temporal/protocol/worker_info.go | 2 | ||||
-rw-r--r-- | plugins/temporal/workflow/message_queue.go | 20 | ||||
-rw-r--r-- | plugins/temporal/workflow/message_queue_test.go | 4 | ||||
-rw-r--r-- | plugins/temporal/workflow/plugin.go | 37 | ||||
-rw-r--r-- | plugins/temporal/workflow/process.go | 33 | ||||
-rw-r--r-- | plugins/temporal/workflow/workflow_pool.go | 61 |
12 files changed, 200 insertions, 213 deletions
diff --git a/plugins/temporal/activity/activity_pool.go b/plugins/temporal/activity/activity_pool.go index 0aa2a62f..d09722ce 100644 --- a/plugins/temporal/activity/activity_pool.go +++ b/plugins/temporal/activity/activity_pool.go @@ -19,42 +19,40 @@ import ( "go.temporal.io/sdk/worker" ) -type ( - activityPool interface { - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.SyncWorker - ActivityNames() []string - GetActivityContext(taskToken []byte) (context.Context, error) - } +// RR_MODE env variable +const RR_MODE = "RR_MODE" //nolint:golint,stylecheck +// RR_CODEC env variable +const RR_CODEC = "RR_CODEC" //nolint:golint,stylecheck + +// +const doNotCompleteOnReturn = "doNotCompleteOnReturn" + +type activityPool interface { + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.SyncWorker + ActivityNames() []string + GetActivityContext(taskToken []byte) (context.Context, error) +} - activityPoolImpl struct { - dc converter.DataConverter - codec rrt.Codec - seqID uint64 - activities []string - wp pool.Pool - tWorkers []worker.Worker - running sync.Map - } -) +type activityPoolImpl struct { + dc converter.DataConverter + codec rrt.Codec + seqID uint64 + activities []string + wp pool.Pool + tWorkers []worker.Worker + running sync.Map +} // newActivityPool -func newActivityPool( - codec rrt.Codec, - listener events.Listener, - poolConfig pool.Config, - server server.Server, -) (activityPool, error) { - wp, err := server.NewWorkerPool( - context.Background(), - poolConfig, - map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, - listener, - ) - +func newActivityPool(codec rrt.Codec, listener events.Listener, poolConfig pool.Config, server server.Server) (activityPool, error) { + const op = errors.Op("new_activity_pool") + // env variables + env := map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()} + wp, err := server.NewWorkerPool(context.Background(), poolConfig, env, listener) if err != nil { - return nil, err + return nil, errors.E(op, err) } return &activityPoolImpl{ @@ -66,17 +64,18 @@ func newActivityPool( // initWorkers request workers info from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("activity_pool_start") pool.dc = temporal.GetDataConverter() err := pool.initWorkers(ctx, temporal) if err != nil { - return err + return errors.E(op, err) } for i := 0; i < len(pool.tWorkers); i++ { err := pool.tWorkers[i].Start() if err != nil { - return err + return errors.E(op, err) } } @@ -105,9 +104,10 @@ func (pool *activityPoolImpl) ActivityNames() []string { // ActivityNames returns list of all available activity names. func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Context, error) { + const op = errors.Op("activity_pool_get_activity_context") c, ok := pool.running.Load(string(taskToken)) if !ok { - return nil, errors.E("heartbeat on non running activity") + return nil, errors.E(op, errors.Str("heartbeat on non running activity")) } return c.(context.Context), nil @@ -115,7 +115,7 @@ func (pool *activityPoolImpl) GetActivityContext(taskToken []byte) (context.Cont // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { - const op = errors.Op("createTemporalWorker") + const op = errors.Op("activity_pool_create_temporal_worker") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool.wp, temporal.GetDataConverter()) if err != nil { @@ -125,20 +125,20 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T pool.activities = make([]string, 0) pool.tWorkers = make([]worker.Worker, 0) - for _, info := range workerInfo { - w, err := temporal.CreateWorker(info.TaskQueue, info.Options) + for i := 0; i < len(workerInfo); i++ { + w, err := temporal.CreateWorker(workerInfo[i].TaskQueue, workerInfo[i].Options) if err != nil { return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) - for _, activityInfo := range info.Activities { + for j := 0; j < len(workerInfo[i].Activities); j++ { w.RegisterActivityWithOptions(pool.executeActivity, activity.RegisterOptions{ - Name: activityInfo.Name, + Name: workerInfo[i].Activities[j].Name, DisableAlreadyRegisteredCheck: false, }) - pool.activities = append(pool.activities, activityInfo.Name) + pool.activities = append(pool.activities, workerInfo[i].Activities[j].Name) } } @@ -147,7 +147,7 @@ func (pool *activityPoolImpl) initWorkers(ctx context.Context, temporal client.T // executes activity with underlying worker. func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common.Payloads) (*common.Payloads, error) { - const op = errors.Op("executeActivity") + const op = errors.Op("activity_pool_execute_activity") heartbeatDetails := &common.Payloads{} if activity.HasHeartbeatDetails(ctx) { @@ -157,18 +157,16 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } } - var ( - info = activity.GetInfo(ctx) - msg = rrt.Message{ - ID: atomic.AddUint64(&pool.seqID, 1), - Command: rrt.InvokeActivity{ - Name: info.ActivityType.Name, - Info: info, - HeartbeatDetails: len(heartbeatDetails.Payloads), - }, - Payloads: args, - } - ) + var info = activity.GetInfo(ctx) + var msg = rrt.Message{ + ID: atomic.AddUint64(&pool.seqID, 1), + Command: rrt.InvokeActivity{ + Name: info.ActivityType.Name, + Info: info, + HeartbeatDetails: len(heartbeatDetails.Payloads), + }, + Payloads: args, + } if len(heartbeatDetails.Payloads) != 0 { msg.Payloads.Payloads = append(msg.Payloads.Payloads, heartbeatDetails.Payloads...) @@ -183,12 +181,12 @@ func (pool *activityPoolImpl) executeActivity(ctx context.Context, args *common. } if len(result) != 1 { - return nil, errors.E(op, "invalid activity worker response") + return nil, errors.E(op, errors.Str("invalid activity worker response")) } out := result[0] if out.Failure != nil { - if out.Failure.Message == "doNotCompleteOnReturn" { + if out.Failure.Message == doNotCompleteOnReturn { return nil, activity.ErrResultPending } diff --git a/plugins/temporal/activity/plugin.go b/plugins/temporal/activity/plugin.go index 02d66297..5e562a8d 100644 --- a/plugins/temporal/activity/plugin.go +++ b/plugins/temporal/activity/plugin.go @@ -39,9 +39,10 @@ type Plugin struct { // Init configures activity service. func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger.Logger) error { + const op = errors.Op("activity_plugin_init") if temporal.GetConfig().Activities == nil { // no need to serve activities - return errors.E(errors.Disabled) + return errors.E(op, errors.Disabled) } p.temporal = temporal @@ -55,11 +56,12 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger // Serve activities with underlying workers. func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) + const op = errors.Op("activity_plugin_serve") + errCh := make(chan error, 1) pool, err := p.startPool() if err != nil { - errCh <- errors.E("startPool", err) + errCh <- errors.E(op, err) return errCh } @@ -83,7 +85,7 @@ func (p *Plugin) Serve() chan error { err = backoff.Retry(p.replacePool, bkoff) if err != nil { - errCh <- errors.E("deadPool", err) + errCh <- errors.E(op, err) } } } @@ -95,11 +97,16 @@ func (p *Plugin) Serve() chan error { // Stop stops the serving plugin. func (p *Plugin) Stop() error { atomic.StoreInt64(&p.closing, 1) + const op = errors.Op("activity_plugin_stop") pool := p.getPool() if pool != nil { p.pool = nil - return pool.Destroy(context.Background()) + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil } return nil @@ -112,9 +119,7 @@ func (p *Plugin) Name() string { // RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { - client, _ := p.temporal.GetClient() - - return &rpc{srv: p, client: client} + return &rpc{srv: p, client: p.temporal.GetClient()} } // Workers returns pool workers. diff --git a/plugins/temporal/client/plugin.go b/plugins/temporal/client/plugin.go index 37c8d7be..047a1815 100644 --- a/plugins/temporal/client/plugin.go +++ b/plugins/temporal/client/plugin.go @@ -24,7 +24,7 @@ var stickyCacheSet = false // Plugin implement Temporal contract. type Plugin struct { workerID int32 - cfg Config + cfg *Config dc converter.DataConverter log logger.Logger client client.Client @@ -32,7 +32,7 @@ type Plugin struct { // Temporal define common interface for RoadRunner plugins. type Temporal interface { - GetClient() (client.Client, error) + GetClient() client.Client GetDataConverter() converter.DataConverter GetConfig() Config GetCodec() rrt.Codec @@ -51,15 +51,27 @@ type Config struct { // Init initiates temporal client plugin. func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("temporal_client_plugin_init") p.log = log p.dc = rrt.NewDataConverter(converter.GetDefaultDataConverter()) + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + if p.cfg == nil { + return errors.E(op, errors.Disabled) + } - return cfg.UnmarshalKey(PluginName, &p.cfg) + return nil } // GetConfig returns temporal configuration. func (p *Plugin) GetConfig() Config { - return p.cfg + if p.cfg != nil { + return *p.cfg + } + // empty + return Config{} } // GetCodec returns communication codec. @@ -79,6 +91,7 @@ func (p *Plugin) GetDataConverter() converter.DataConverter { // Serve starts temporal srv. func (p *Plugin) Serve() chan error { + const op = errors.Op("temporal_client_plugin_serve") errCh := make(chan error, 1) var err error @@ -94,12 +107,12 @@ func (p *Plugin) Serve() chan error { DataConverter: p.dc, }) - p.log.Debug("Connected to temporal server", "Plugin", p.cfg.Address) - if err != nil { - errCh <- errors.E(errors.Op("p connect"), err) + errCh <- errors.E(op, err) } + p.log.Debug("connected to temporal server", "address", p.cfg.Address) + return errCh } @@ -113,14 +126,15 @@ func (p *Plugin) Stop() error { } // GetClient returns active srv connection. -func (p *Plugin) GetClient() (client.Client, error) { - return p.client, nil +func (p *Plugin) GetClient() client.Client { + return p.client } // CreateWorker allocates new temporal worker on an active connection. func (p *Plugin) CreateWorker(tq string, options worker.Options) (worker.Worker, error) { + const op = errors.Op("temporal_client_plugin_create_worker") if p.client == nil { - return nil, errors.E("unable to create worker, invalid temporal p") + return nil, errors.E(op, errors.Str("unable to create worker, invalid temporal client")) } if options.Identity == "" { diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go index dae3a7d0..e7a77068 100644 --- a/plugins/temporal/protocol/json_codec.go +++ b/plugins/temporal/protocol/json_codec.go @@ -2,7 +2,7 @@ package protocol import ( "github.com/fatih/color" - jsoniter "github.com/json-iterator/go" + j "github.com/json-iterator/go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -10,34 +10,34 @@ import ( "go.temporal.io/api/failure/v1" ) -type ( - // JSONCodec can be used for debugging and log capturing reasons. - JSONCodec struct { - // level enables verbose logging or all incoming and outcoming messages. - level DebugLevel +var json = j.ConfigCompatibleWithStandardLibrary - // logger renders messages when debug enabled. - logger logger.Logger - } +// JSONCodec can be used for debugging and log capturing reasons. +type JSONCodec struct { + // level enables verbose logging or all incoming and outcoming messages. + level DebugLevel - // jsonFrame contains message command in binary form. - jsonFrame struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` + // logger renders messages when debug enabled. + logger logger.Logger +} - // Command name. Optional. - Command string `json:"command,omitempty"` +// jsonFrame contains message command in binary form. +type jsonFrame struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` - // Options to be unmarshalled to body (raw payload). - Options jsoniter.RawMessage `json:"options,omitempty"` + // Command name. Optional. + Command string `json:"command,omitempty"` - // Failure associated with command id. - Failure []byte `json:"failure,omitempty"` + // Options to be unmarshalled to body (raw payload). + Options j.RawMessage `json:"options,omitempty"` - // Payloads specific to the command or result. - Payloads []byte `json:"payloads,omitempty"` - } -) + // Failure associated with command id. + Failure []byte `json:"failure,omitempty"` + + // Payloads specific to the command or result. + Payloads []byte `json:"payloads,omitempty"` +} // NewJSONCodec creates new Json communication codec. func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { @@ -62,21 +62,20 @@ func (c *JSONCodec) GetName() string { // Execute exchanges commands with worker. func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + const op = errors.Op("json_codec_execute") if len(msg) == 0 { return nil, nil } - var ( - response = make([]jsonFrame, 0, 5) - result = make([]Message, 0, 5) - err error - ) + var response = make([]jsonFrame, 0, 5) + var result = make([]Message, 0, 5) + var err error frames := make([]jsonFrame, 0, len(msg)) for _, m := range msg { frame, err := c.packFrame(m) if err != nil { - return nil, err + return nil, errors.E(op, err) } frames = append(frames, frame) @@ -88,14 +87,14 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, p.Context = []byte("null") } - p.Context, err = jsoniter.Marshal(ctx) + p.Context, err = json.Marshal(ctx) if err != nil { - return nil, errors.E(errors.Op("encodeContext"), err) + return nil, errors.E(op, err) } - p.Body, err = jsoniter.Marshal(frames) + p.Body, err = json.Marshal(frames) if err != nil { - return nil, errors.E(errors.Op("encodePayload"), err) + return nil, errors.E(op, err) } if c.level >= DebugNormal { @@ -109,7 +108,7 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, out, err := e.Exec(p) if err != nil { - return nil, errors.E(errors.Op("execute"), err) + return nil, errors.E(op, err) } if len(out.Body) == 0 { @@ -126,15 +125,15 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, c.logger.Debug(logMessage, "receive", true) } - err = jsoniter.Unmarshal(out.Body, &response) + err = json.Unmarshal(out.Body, &response) if err != nil { - return nil, errors.E(errors.Op("parseResponse"), err) + return nil, errors.E(op, err) } for _, f := range response { msg, err := c.parseFrame(f) if err != nil { - return nil, err + return nil, errors.E(op, err) } result = append(result, msg) @@ -174,7 +173,7 @@ func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { return jsonFrame{}, err } - frame.Options, err = jsoniter.Marshal(msg.Command) + frame.Options, err = json.Marshal(msg.Command) if err != nil { return jsonFrame{}, err } @@ -214,7 +213,7 @@ func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { return Message{}, err } - err = jsoniter.Unmarshal(frame.Options, &cmd) + err = json.Unmarshal(frame.Options, &cmd) if err != nil { return Message{}, err } diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go index 4568fd1d..d5e0f49d 100644 --- a/plugins/temporal/protocol/message.go +++ b/plugins/temporal/protocol/message.go @@ -39,8 +39,7 @@ const ( ) // GetWorkerInfo reads worker information. -type GetWorkerInfo struct { -} +type GetWorkerInfo struct{} // InvokeActivity invokes activity. type InvokeActivity struct { @@ -218,6 +217,7 @@ func (cmd NewTimer) ToDuration() time.Duration { // returns command name (only for the commands sent to the worker) func commandName(cmd interface{}) (string, error) { + const op = errors.Op("command_name") switch cmd.(type) { case GetWorkerInfo, *GetWorkerInfo: return getWorkerInfoCommand, nil @@ -260,12 +260,13 @@ func commandName(cmd interface{}) (string, error) { case Panic, *Panic: return panicCommand, nil default: - return "", errors.E(errors.Op("commandName"), "undefined command type", cmd) + return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) } } // reads command from binary payload func initCommand(name string) (interface{}, error) { + const op = errors.Op("init_command") switch name { case getWorkerInfoCommand: return &GetWorkerInfo{}, nil @@ -328,6 +329,6 @@ func initCommand(name string) (interface{}, error) { return &Panic{}, nil default: - return nil, errors.E(errors.Op("initCommand"), "undefined command type", name) + return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) } } diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go index b41f02b6..607fe0fe 100644 --- a/plugins/temporal/protocol/proto_codec.go +++ b/plugins/temporal/protocol/proto_codec.go @@ -120,6 +120,7 @@ func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { } func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { + const op = errors.Op("proto_codec_parse_message") var err error msg := Message{ @@ -131,12 +132,12 @@ func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { if frame.Command != "" { msg.Command, err = initCommand(frame.Command) if err != nil { - return Message{}, err + return Message{}, errors.E(op, err) } err = jsoniter.Unmarshal(frame.Options, &msg.Command) if err != nil { - return Message{}, err + return Message{}, errors.E(op, err) } } diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go index 6dfcd81f..58a0ae66 100644 --- a/plugins/temporal/protocol/worker_info.go +++ b/plugins/temporal/protocol/worker_info.go @@ -47,7 +47,7 @@ func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerI result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) if err != nil { - return nil, err + return nil, errors.E(op, err) } if len(result) != 1 { diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go index 50949897..8f4409d1 100644 --- a/plugins/temporal/workflow/message_queue.go +++ b/plugins/temporal/workflow/message_queue.go @@ -22,28 +22,20 @@ func (mq *messageQueue) flush() { mq.queue = mq.queue[0:0] } -func (mq *messageQueue) allocateMessage( - cmd interface{}, - payloads *common.Payloads, -) (id uint64, msg rrt.Message, err error) { - msg = rrt.Message{ +func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { + msg := rrt.Message{ ID: mq.seqID(), Command: cmd, Payloads: payloads, } - return msg.ID, msg, nil + return msg.ID, msg } -func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) (id uint64, err error) { - id, msg, err := mq.allocateMessage(cmd, payloads) - if err != nil { - return 0, err - } - +func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { + id, msg := mq.allocateMessage(cmd, payloads) mq.queue = append(mq.queue, msg) - - return id, nil + return id } func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go index 61f5123f..1fcd409f 100644 --- a/plugins/temporal/workflow/message_queue_test.go +++ b/plugins/temporal/workflow/message_queue_test.go @@ -44,10 +44,8 @@ func Test_MessageQueueCommandID(t *testing.T) { return atomic.AddUint64(&index, 1) }) - n, err := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) + n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) assert.Equal(t, n, index) - - assert.NoError(t, err) assert.Len(t, mq.queue, 1) mq.flush() diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go index ee2d722c..3a397364 100644 --- a/plugins/temporal/workflow/plugin.go +++ b/plugins/temporal/workflow/plugin.go @@ -41,18 +41,19 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger p.server = server p.events = events.NewEventsHandler() p.log = log - p.reset = make(chan struct{}) + p.reset = make(chan struct{}, 1) return nil } // Serve starts workflow service. func (p *Plugin) Serve() chan error { + const op = errors.Op("workflow_plugin_serve") errCh := make(chan error, 1) pool, err := p.startPool() if err != nil { - errCh <- errors.E("startPool", err) + errCh <- errors.E(op, err) return errCh } @@ -76,7 +77,7 @@ func (p *Plugin) Serve() chan error { err = backoff.Retry(p.replacePool, bkoff) if err != nil { - errCh <- errors.E("deadPool", err) + errCh <- errors.E(op, err) } } } @@ -87,12 +88,17 @@ func (p *Plugin) Serve() chan error { // Stop workflow service. func (p *Plugin) Stop() error { + const op = errors.Op("workflow_plugin_stop") atomic.StoreInt64(&p.closing, 1) pool := p.getPool() if pool != nil { p.pool = nil - return pool.Destroy(context.Background()) + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil } return nil @@ -121,11 +127,6 @@ func (p *Plugin) Reset() error { } // AddListener adds event listeners to the service. -func (p *Plugin) AddListener(listener events.Listener) { - p.events.AddListener(listener) -} - -// AddListener adds event listeners to the service. func (p *Plugin) poolListener(event interface{}) { if ev, ok := event.(PoolEvent); ok { if ev.Event == eventWorkerExit { @@ -141,18 +142,19 @@ func (p *Plugin) poolListener(event interface{}) { // AddListener adds event listeners to the service. func (p *Plugin) startPool() (workflowPool, error) { + const op = errors.Op("workflow_plugin_start_pool") pool, err := newWorkflowPool( p.temporal.GetCodec().WithLogger(p.log), p.poolListener, p.server, ) if err != nil { - return nil, errors.E(errors.Op("initWorkflowPool"), err) + return nil, errors.E(op, err) } err = pool.Start(context.Background(), p.temporal) if err != nil { - return nil, errors.E(errors.Op("startWorkflowPool"), err) + return nil, errors.E(op, err) } p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) @@ -162,29 +164,30 @@ func (p *Plugin) startPool() (workflowPool, error) { func (p *Plugin) replacePool() error { p.mu.Lock() + const op = errors.Op("workflow_plugin_replace_pool") defer p.mu.Unlock() if p.pool != nil { - errD := p.pool.Destroy(context.Background()) + err := p.pool.Destroy(context.Background()) p.pool = nil - if errD != nil { + if err != nil { p.log.Error( "Unable to destroy expired workflow pool", "error", - errors.E(errors.Op("destroyWorkflowPool"), errD), + errors.E(op, err), ) + return errors.E(op, err) } } pool, err := p.startPool() if err != nil { p.log.Error("Replace workflow pool failed", "error", err) - return errors.E(errors.Op("newWorkflowPool"), err) + return errors.E(op, err) } - p.log.Debug("Replace workflow pool") - p.pool = pool + p.log.Debug("workflow pool successfully replaced") return nil } diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go index ec5a14eb..45e6885c 100644 --- a/plugins/temporal/workflow/process.go +++ b/plugins/temporal/workflow/process.go @@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com lastCompletionOffset = len(lastCompletion.Payloads) } - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.StartWorkflow{ Info: env.WorkflowInfo(), LastCompletion: lastCompletionOffset, }, input, ) - - if err != nil { - panic(err) - } } // OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. @@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() { // TODO: properly handle errors // panic(err) - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - if err != nil { - panic(err) - } - _, _ = wf.discardQueue() } @@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context { // schedule cancel command func (wf *workflowProcess) handleCancel() { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - - if err != nil { - panic(err) - } } // schedule the signal processing func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.InvokeSignal{ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, Name: name, }, input, ) - - if err != nil { - panic(err) - } } // Handle query in blocking mode. @@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { // Run single command and return single result. func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { - const op = errors.Op("run command") - _, msg, err := wf.mq.allocateMessage(cmd, payloads) - if err != nil { - return rrt.Message{}, err - } + const op = errors.Op("workflow_process_runcommand") + _, msg := wf.mq.allocateMessage(cmd, payloads) result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) if err != nil { @@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa } if len(result) != 1 { - return rrt.Message{}, errors.E("unexpected worker response") + return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) } return result[0], nil diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go index 2022b624..1a78f377 100644 --- a/plugins/temporal/workflow/workflow_pool.go +++ b/plugins/temporal/workflow/workflow_pool.go @@ -19,41 +19,35 @@ import ( const eventWorkerExit = 8390 -type ( - workflowPool interface { - SeqID() uint64 - Exec(p payload.Payload) (payload.Payload, error) - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.BaseProcess - WorkflowNames() []string - } +type workflowPool interface { + SeqID() uint64 + Exec(p payload.Payload) (payload.Payload, error) + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.BaseProcess + WorkflowNames() []string +} - // PoolEvent triggered on workflow pool worker events. - PoolEvent struct { - Event int - Context interface{} - Caused error - } +// PoolEvent triggered on workflow pool worker events. +type PoolEvent struct { + Event int + Context interface{} + Caused error +} - // workflowPoolImpl manages workflowProcess executions between worker restarts. - workflowPoolImpl struct { - codec rrt.Codec - seqID uint64 - workflows map[string]rrt.WorkflowInfo - tWorkers []worker.Worker - mu sync.Mutex - worker rrWorker.SyncWorker - active bool - } -) +// workflowPoolImpl manages workflowProcess executions between worker restarts. +type workflowPoolImpl struct { + codec rrt.Codec + seqID uint64 + workflows map[string]rrt.WorkflowInfo + tWorkers []worker.Worker + mu sync.Mutex + worker rrWorker.SyncWorker + active bool +} // newWorkflowPool creates new workflow pool. -func newWorkflowPool( - codec rrt.Codec, - listener events.Listener, - factory server.Server, -) (workflowPool, error) { +func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { w, err := factory.NewWorker( context.Background(), map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, @@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string { // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_init_workers") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) if err != nil { - return err + return errors.E(op, err) } pool.workflows = make(map[string]rrt.WorkflowInfo) @@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T for _, info := range workerInfo { w, err := temporal.CreateWorker(info.TaskQueue, info.Options) if err != nil { - return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx)) + return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) |