summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpkg/pool/static_pool.go4
-rw-r--r--plugins/jobs/plugin.go9
-rw-r--r--plugins/resetter/plugin.go2
-rw-r--r--plugins/sqs/item.go3
-rw-r--r--plugins/status/plugin.go4
5 files changed, 14 insertions, 8 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 720ca9da..7e190846 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -329,7 +329,9 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
return nil, errors.E(op, err)
}
- err = sw.Stop()
+ // destroy the worker
+ sw.State().Set(worker.StateDestroyed)
+ err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
return nil, errors.E(op, err)
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 83b302ee..3f3fa196 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -342,10 +342,6 @@ func (p *Plugin) Stop() error {
cancel()
}
- p.Lock()
- p.workersPool.Destroy(context.Background())
- p.Unlock()
-
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
// The main target is to stop the drivers
@@ -358,6 +354,11 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
+
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
return nil
}
diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go
index b2fe59af..191185ae 100644
--- a/plugins/resetter/plugin.go
+++ b/plugins/resetter/plugin.go
@@ -21,7 +21,7 @@ func (p *Plugin) Reset(name string) error {
const op = errors.Op("resetter_plugin_reset_by_name")
svc, ok := p.registry[name]
if !ok {
- return errors.E(op, errors.Errorf("no such service: %s", name))
+ return errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Reset()
diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go
index 4e33e99e..969d8b5b 100644
--- a/plugins/sqs/item.go
+++ b/plugins/sqs/item.go
@@ -22,6 +22,7 @@ const (
)
var itemAttributes = []string{
+ job.RRID,
job.RRJob,
job.RRDelay,
job.RRPriority,
@@ -184,6 +185,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
QueueUrl: queue,
DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRID: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Ident)},
job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
@@ -228,6 +230,7 @@ func (c *consumer) unpack(msg *types.Message) (*Item, error) {
item := &Item{
Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Ident: *msg.MessageAttributes[job.RRID].StringValue,
Payload: *msg.Body,
Headers: h,
Options: &Options{
diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go
index 82a0fa6c..b76ad0a3 100644
--- a/plugins/status/plugin.go
+++ b/plugins/status/plugin.go
@@ -85,7 +85,7 @@ func (c *Plugin) status(name string) (Status, error) {
const op = errors.Op("checker_plugin_status")
svc, ok := c.statusRegistry[name]
if !ok {
- return Status{}, errors.E(op, errors.Errorf("no such service: %s", name))
+ return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Status(), nil
@@ -96,7 +96,7 @@ func (c *Plugin) ready(name string) (Status, error) {
const op = errors.Op("checker_plugin_ready")
svc, ok := c.readyRegistry[name]
if !ok {
- return Status{}, errors.E(op, errors.Errorf("no such service: %s", name))
+ return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name))
}
return svc.Ready(), nil