summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
commit4638bdca80f75bc120b330022086d31c8b41be5b (patch)
tree2362cdb39dc2e793f5bec7fd9b8d2363f516c1d4 /plugins/temporal/workflow
parent7756eb25453c8006fbd75aa5c97159e96331b840 (diff)
Code cleanup
Diffstat (limited to 'plugins/temporal/workflow')
-rw-r--r--plugins/temporal/workflow/message_queue.go20
-rw-r--r--plugins/temporal/workflow/message_queue_test.go4
-rw-r--r--plugins/temporal/workflow/plugin.go37
-rw-r--r--plugins/temporal/workflow/process.go33
-rw-r--r--plugins/temporal/workflow/workflow_pool.go61
5 files changed, 62 insertions, 93 deletions
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
index 50949897..8f4409d1 100644
--- a/plugins/temporal/workflow/message_queue.go
+++ b/plugins/temporal/workflow/message_queue.go
@@ -22,28 +22,20 @@ func (mq *messageQueue) flush() {
mq.queue = mq.queue[0:0]
}
-func (mq *messageQueue) allocateMessage(
- cmd interface{},
- payloads *common.Payloads,
-) (id uint64, msg rrt.Message, err error) {
- msg = rrt.Message{
+func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
+ msg := rrt.Message{
ID: mq.seqID(),
Command: cmd,
Payloads: payloads,
}
- return msg.ID, msg, nil
+ return msg.ID, msg
}
-func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) (id uint64, err error) {
- id, msg, err := mq.allocateMessage(cmd, payloads)
- if err != nil {
- return 0, err
- }
-
+func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
+ id, msg := mq.allocateMessage(cmd, payloads)
mq.queue = append(mq.queue, msg)
-
- return id, nil
+ return id
}
func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go
index 61f5123f..1fcd409f 100644
--- a/plugins/temporal/workflow/message_queue_test.go
+++ b/plugins/temporal/workflow/message_queue_test.go
@@ -44,10 +44,8 @@ func Test_MessageQueueCommandID(t *testing.T) {
return atomic.AddUint64(&index, 1)
})
- n, err := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
+ n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{})
assert.Equal(t, n, index)
-
- assert.NoError(t, err)
assert.Len(t, mq.queue, 1)
mq.flush()
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
index ee2d722c..3a397364 100644
--- a/plugins/temporal/workflow/plugin.go
+++ b/plugins/temporal/workflow/plugin.go
@@ -41,18 +41,19 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger
p.server = server
p.events = events.NewEventsHandler()
p.log = log
- p.reset = make(chan struct{})
+ p.reset = make(chan struct{}, 1)
return nil
}
// Serve starts workflow service.
func (p *Plugin) Serve() chan error {
+ const op = errors.Op("workflow_plugin_serve")
errCh := make(chan error, 1)
pool, err := p.startPool()
if err != nil {
- errCh <- errors.E("startPool", err)
+ errCh <- errors.E(op, err)
return errCh
}
@@ -76,7 +77,7 @@ func (p *Plugin) Serve() chan error {
err = backoff.Retry(p.replacePool, bkoff)
if err != nil {
- errCh <- errors.E("deadPool", err)
+ errCh <- errors.E(op, err)
}
}
}
@@ -87,12 +88,17 @@ func (p *Plugin) Serve() chan error {
// Stop workflow service.
func (p *Plugin) Stop() error {
+ const op = errors.Op("workflow_plugin_stop")
atomic.StoreInt64(&p.closing, 1)
pool := p.getPool()
if pool != nil {
p.pool = nil
- return pool.Destroy(context.Background())
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
return nil
@@ -121,11 +127,6 @@ func (p *Plugin) Reset() error {
}
// AddListener adds event listeners to the service.
-func (p *Plugin) AddListener(listener events.Listener) {
- p.events.AddListener(listener)
-}
-
-// AddListener adds event listeners to the service.
func (p *Plugin) poolListener(event interface{}) {
if ev, ok := event.(PoolEvent); ok {
if ev.Event == eventWorkerExit {
@@ -141,18 +142,19 @@ func (p *Plugin) poolListener(event interface{}) {
// AddListener adds event listeners to the service.
func (p *Plugin) startPool() (workflowPool, error) {
+ const op = errors.Op("workflow_plugin_start_pool")
pool, err := newWorkflowPool(
p.temporal.GetCodec().WithLogger(p.log),
p.poolListener,
p.server,
)
if err != nil {
- return nil, errors.E(errors.Op("initWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
err = pool.Start(context.Background(), p.temporal)
if err != nil {
- return nil, errors.E(errors.Op("startWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
@@ -162,29 +164,30 @@ func (p *Plugin) startPool() (workflowPool, error) {
func (p *Plugin) replacePool() error {
p.mu.Lock()
+ const op = errors.Op("workflow_plugin_replace_pool")
defer p.mu.Unlock()
if p.pool != nil {
- errD := p.pool.Destroy(context.Background())
+ err := p.pool.Destroy(context.Background())
p.pool = nil
- if errD != nil {
+ if err != nil {
p.log.Error(
"Unable to destroy expired workflow pool",
"error",
- errors.E(errors.Op("destroyWorkflowPool"), errD),
+ errors.E(op, err),
)
+ return errors.E(op, err)
}
}
pool, err := p.startPool()
if err != nil {
p.log.Error("Replace workflow pool failed", "error", err)
- return errors.E(errors.Op("newWorkflowPool"), err)
+ return errors.E(op, err)
}
- p.log.Debug("Replace workflow pool")
-
p.pool = pool
+ p.log.Debug("workflow pool successfully replaced")
return nil
}
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
index ec5a14eb..45e6885c 100644
--- a/plugins/temporal/workflow/process.go
+++ b/plugins/temporal/workflow/process.go
@@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com
lastCompletionOffset = len(lastCompletion.Payloads)
}
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
@@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() {
// TODO: properly handle errors
// panic(err)
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
- if err != nil {
- panic(err)
- }
-
_, _ = wf.discardQueue()
}
@@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context {
// schedule cancel command
func (wf *workflowProcess) handleCancel() {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
-
- if err != nil {
- panic(err)
- }
}
// schedule the signal processing
func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.InvokeSignal{
RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
Name: name,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// Handle query in blocking mode.
@@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
// Run single command and return single result.
func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("run command")
- _, msg, err := wf.mq.allocateMessage(cmd, payloads)
- if err != nil {
- return rrt.Message{}, err
- }
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
if err != nil {
@@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa
}
if len(result) != 1 {
- return rrt.Message{}, errors.E("unexpected worker response")
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
}
return result[0], nil
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
index 2022b624..1a78f377 100644
--- a/plugins/temporal/workflow/workflow_pool.go
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -19,41 +19,35 @@ import (
const eventWorkerExit = 8390
-type (
- workflowPool interface {
- SeqID() uint64
- Exec(p payload.Payload) (payload.Payload, error)
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.BaseProcess
- WorkflowNames() []string
- }
+type workflowPool interface {
+ SeqID() uint64
+ Exec(p payload.Payload) (payload.Payload, error)
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.BaseProcess
+ WorkflowNames() []string
+}
- // PoolEvent triggered on workflow pool worker events.
- PoolEvent struct {
- Event int
- Context interface{}
- Caused error
- }
+// PoolEvent triggered on workflow pool worker events.
+type PoolEvent struct {
+ Event int
+ Context interface{}
+ Caused error
+}
- // workflowPoolImpl manages workflowProcess executions between worker restarts.
- workflowPoolImpl struct {
- codec rrt.Codec
- seqID uint64
- workflows map[string]rrt.WorkflowInfo
- tWorkers []worker.Worker
- mu sync.Mutex
- worker rrWorker.SyncWorker
- active bool
- }
-)
+// workflowPoolImpl manages workflowProcess executions between worker restarts.
+type workflowPoolImpl struct {
+ codec rrt.Codec
+ seqID uint64
+ workflows map[string]rrt.WorkflowInfo
+ tWorkers []worker.Worker
+ mu sync.Mutex
+ worker rrWorker.SyncWorker
+ active bool
+}
// newWorkflowPool creates new workflow pool.
-func newWorkflowPool(
- codec rrt.Codec,
- listener events.Listener,
- factory server.Server,
-) (workflowPool, error) {
+func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
w, err := factory.NewWorker(
context.Background(),
map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
@@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string {
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_init_workers")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
if err != nil {
- return err
+ return errors.E(op, err)
}
pool.workflows = make(map[string]rrt.WorkflowInfo)
@@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T
for _, info := range workerInfo {
w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
if err != nil {
- return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx))
+ return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)