summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 10:27:13 +0300
committerValery Piashchynski <[email protected]>2021-01-26 10:27:13 +0300
commit800042d81fc9224ab05f01d1506df0d4b7ac4daa (patch)
tree93eecce841db96e21c64f67986b73362aad1ee33 /plugins
parent4638bdca80f75bc120b330022086d31c8b41be5b (diff)
Attempt to fix data race in the Test_WorkerError_DisasterRecovery_Heavy
test
Diffstat (limited to 'plugins')
-rw-r--r--plugins/temporal/workflow/plugin.go2
-rw-r--r--plugins/temporal/workflow/workflow_pool.go23
2 files changed, 18 insertions, 7 deletions
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
index 3a397364..572d9a3b 100644
--- a/plugins/temporal/workflow/plugin.go
+++ b/plugins/temporal/workflow/plugin.go
@@ -111,6 +111,8 @@ func (p *Plugin) Name() string {
// Workers returns list of available workflow workers.
func (p *Plugin) Workers() []worker.BaseProcess {
+ p.mu.Lock()
+ defer p.mu.Unlock()
return p.pool.Workers()
}
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
index 1a78f377..b9ed46c8 100644
--- a/plugins/temporal/workflow/workflow_pool.go
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -19,6 +19,12 @@ import (
const eventWorkerExit = 8390
+// RR_MODE env variable key
+const RR_MODE = "RR_MODE" //nolint
+
+// RR_CODEC env variable key
+const RR_CODEC = "RR_CODEC" //nolint
+
type workflowPool interface {
SeqID() uint64
Exec(p payload.Payload) (payload.Payload, error)
@@ -48,14 +54,14 @@ type workflowPoolImpl struct {
// newWorkflowPool creates new workflow pool.
func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
+ const op = errors.Op("new_workflow_pool")
w, err := factory.NewWorker(
context.Background(),
- map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
+ map[string]string{RR_MODE: RRMode, RR_CODEC: codec.GetName()},
listener,
)
-
if err != nil {
- return nil, errors.E(errors.Op("newWorker"), err)
+ return nil, errors.E(op, err)
}
go func() {
@@ -68,19 +74,20 @@ func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.S
// Start the pool in non blocking mode.
func (pool *workflowPoolImpl) Start(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_start")
pool.mu.Lock()
pool.active = true
pool.mu.Unlock()
err := pool.initWorkers(ctx, temporal)
if err != nil {
- return errors.E(errors.Op("initWorkers"), err)
+ return errors.E(op, err)
}
for i := 0; i < len(pool.tWorkers); i++ {
err := pool.tWorkers[i].Start()
if err != nil {
- return errors.E(errors.Op("startTemporalWorker"), err)
+ return errors.E(op, err)
}
}
@@ -96,6 +103,7 @@ func (pool *workflowPoolImpl) Active() bool {
func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
pool.mu.Lock()
defer pool.mu.Unlock()
+ const op = errors.Op("workflow_pool_destroy")
pool.active = false
for i := 0; i < len(pool.tWorkers); i++ {
@@ -104,8 +112,9 @@ func (pool *workflowPoolImpl) Destroy(ctx context.Context) error {
worker.PurgeStickyWorkflowCache()
- if err := pool.worker.Stop(); err != nil {
- return errors.E(errors.Op("stopWorkflowWorker"), err)
+ err := pool.worker.Stop()
+ if err != nil {
+ return errors.E(op, err)
}
return nil