summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/workflow/plugin.go')
-rw-r--r--plugins/temporal/workflow/plugin.go37
1 files changed, 20 insertions, 17 deletions
diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go
index ee2d722c..3a397364 100644
--- a/plugins/temporal/workflow/plugin.go
+++ b/plugins/temporal/workflow/plugin.go
@@ -41,18 +41,19 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger
p.server = server
p.events = events.NewEventsHandler()
p.log = log
- p.reset = make(chan struct{})
+ p.reset = make(chan struct{}, 1)
return nil
}
// Serve starts workflow service.
func (p *Plugin) Serve() chan error {
+ const op = errors.Op("workflow_plugin_serve")
errCh := make(chan error, 1)
pool, err := p.startPool()
if err != nil {
- errCh <- errors.E("startPool", err)
+ errCh <- errors.E(op, err)
return errCh
}
@@ -76,7 +77,7 @@ func (p *Plugin) Serve() chan error {
err = backoff.Retry(p.replacePool, bkoff)
if err != nil {
- errCh <- errors.E("deadPool", err)
+ errCh <- errors.E(op, err)
}
}
}
@@ -87,12 +88,17 @@ func (p *Plugin) Serve() chan error {
// Stop workflow service.
func (p *Plugin) Stop() error {
+ const op = errors.Op("workflow_plugin_stop")
atomic.StoreInt64(&p.closing, 1)
pool := p.getPool()
if pool != nil {
p.pool = nil
- return pool.Destroy(context.Background())
+ err := pool.Destroy(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
}
return nil
@@ -121,11 +127,6 @@ func (p *Plugin) Reset() error {
}
// AddListener adds event listeners to the service.
-func (p *Plugin) AddListener(listener events.Listener) {
- p.events.AddListener(listener)
-}
-
-// AddListener adds event listeners to the service.
func (p *Plugin) poolListener(event interface{}) {
if ev, ok := event.(PoolEvent); ok {
if ev.Event == eventWorkerExit {
@@ -141,18 +142,19 @@ func (p *Plugin) poolListener(event interface{}) {
// AddListener adds event listeners to the service.
func (p *Plugin) startPool() (workflowPool, error) {
+ const op = errors.Op("workflow_plugin_start_pool")
pool, err := newWorkflowPool(
p.temporal.GetCodec().WithLogger(p.log),
p.poolListener,
p.server,
)
if err != nil {
- return nil, errors.E(errors.Op("initWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
err = pool.Start(context.Background(), p.temporal)
if err != nil {
- return nil, errors.E(errors.Op("startWorkflowPool"), err)
+ return nil, errors.E(op, err)
}
p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames())
@@ -162,29 +164,30 @@ func (p *Plugin) startPool() (workflowPool, error) {
func (p *Plugin) replacePool() error {
p.mu.Lock()
+ const op = errors.Op("workflow_plugin_replace_pool")
defer p.mu.Unlock()
if p.pool != nil {
- errD := p.pool.Destroy(context.Background())
+ err := p.pool.Destroy(context.Background())
p.pool = nil
- if errD != nil {
+ if err != nil {
p.log.Error(
"Unable to destroy expired workflow pool",
"error",
- errors.E(errors.Op("destroyWorkflowPool"), errD),
+ errors.E(op, err),
)
+ return errors.E(op, err)
}
}
pool, err := p.startPool()
if err != nil {
p.log.Error("Replace workflow pool failed", "error", err)
- return errors.E(errors.Op("newWorkflowPool"), err)
+ return errors.E(op, err)
}
- p.log.Debug("Replace workflow pool")
-
p.pool = pool
+ p.log.Debug("workflow pool successfully replaced")
return nil
}