diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 10:27:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-26 10:27:13 +0300 |
commit | 800042d81fc9224ab05f01d1506df0d4b7ac4daa (patch) | |
tree | 93eecce841db96e21c64f67986b73362aad1ee33 /plugins | |
parent | 4638bdca80f75bc120b330022086d31c8b41be5b (diff) |
Attempt to fix data race in the Test_WorkerError_DisasterRecovery_Heavy
test
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/temporal/workflow/plugin.go | 2 | ||||
-rw-r--r-- | plugins/temporal/workflow/workflow_pool.go | 23 |
2 files changed, 18 insertions, 7 deletions
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go index 3a397364..572d9a3b 100644 --- a/plugins/temporal/workflow/plugin.go +++ b/plugins/temporal/workflow/plugin.go @@ -111,6 +111,8 @@ func (p *Plugin) Name() string { // Workers returns list of available workflow workers. func (p *Plugin) Workers() []worker.BaseProcess { + p.mu.Lock() + defer p.mu.Unlock() return p.pool.Workers() } diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go index 1a78f377..b9ed46c8 100644 --- a/plugins/temporal/workflow/workflow_pool.go +++ b/plugins/temporal/workflow/workflow_pool.go @@ -19,6 +19,12 @@ import ( const eventWorkerExit = 8390 +// RR_MODE env variable key +const RR_MODE = "RR_MODE" //nolint + +// RR_CODEC env variable key +const RR_CODEC = "RR_CODEC" //nolint + type workflowPool interface { SeqID() uint64 Exec(p payload.Payload) (payload.Payload, error) @@ -48,14 +54,14 @@ type workflowPoolImpl struct { // newWorkflowPool creates new workflow pool. func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { + const op = errors.Op("new_workflow_pool") w, err := factory.NewWorker( context.Background(), - map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, + map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()}, listener, ) - if err != nil { - return nil, errors.E(errors.Op("newWorker"), err) + return nil, errors.E(op, err) } go func() { @@ -68,19 +74,20 @@ func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.S // Start the pool in non blocking mode. func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_start") pool.mu.Lock() pool.active = true pool.mu.Unlock() err := pool.initWorkers(ctx, temporal) if err != nil { - return errors.E(errors.Op("initWorkers"), err) + return errors.E(op, err) } for i := 0; i < len(pool.tWorkers); i++ { err := pool.tWorkers[i].Start() if err != nil { - return errors.E(errors.Op("startTemporalWorker"), err) + return errors.E(op, err) } } @@ -96,6 +103,7 @@ func (pool *workflowPoolImpl) Active() bool { func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { pool.mu.Lock() defer pool.mu.Unlock() + const op = errors.Op("workflow_pool_destroy") pool.active = false for i := 0; i < len(pool.tWorkers); i++ { @@ -104,8 +112,9 @@ func (pool *workflowPoolImpl) Destroy(ctx context.Context) error { worker.PurgeStickyWorkflowCache() - if err := pool.worker.Stop(); err != nil { - return errors.E(errors.Op("stopWorkflowWorker"), err) + err := pool.worker.Stop() + if err != nil { + return errors.E(op, err) } return nil |