diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 14:59:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-31 14:59:28 +0300 |
commit | 31cf040029eb0b26278e4a9948cbc1aba77ed58b (patch) | |
tree | 884dd2991acf12826752632b8321410e7cc923ce /plugins | |
parent | 2f44878a7eac71d7b81e66246b46c615a95892d7 (diff) |
Naming: service -> plugin
Fix bug with survived workers in the debug mode
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/plugin.go | 9 | ||||
-rw-r--r-- | plugins/resetter/plugin.go | 2 | ||||
-rw-r--r-- | plugins/sqs/item.go | 3 | ||||
-rw-r--r-- | plugins/status/plugin.go | 4 |
4 files changed, 11 insertions, 7 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 83b302ee..3f3fa196 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -342,10 +342,6 @@ func (p *Plugin) Stop() error { cancel() } - p.Lock() - p.workersPool.Destroy(context.Background()) - p.Unlock() - // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, // but if not, this is not a problem at all. // The main target is to stop the drivers @@ -358,6 +354,11 @@ func (p *Plugin) Stop() error { // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) + + p.Lock() + p.workersPool.Destroy(context.Background()) + p.Unlock() + return nil } diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go index b2fe59af..191185ae 100644 --- a/plugins/resetter/plugin.go +++ b/plugins/resetter/plugin.go @@ -21,7 +21,7 @@ func (p *Plugin) Reset(name string) error { const op = errors.Op("resetter_plugin_reset_by_name") svc, ok := p.registry[name] if !ok { - return errors.E(op, errors.Errorf("no such service: %s", name)) + return errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Reset() diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go index 4e33e99e..969d8b5b 100644 --- a/plugins/sqs/item.go +++ b/plugins/sqs/item.go @@ -22,6 +22,7 @@ const ( ) var itemAttributes = []string{ + job.RRID, job.RRJob, job.RRDelay, job.RRPriority, @@ -184,6 +185,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { QueueUrl: queue, DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ + job.RRID: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Ident)}, job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, @@ -228,6 +230,7 @@ func (c *consumer) unpack(msg *types.Message) (*Item, error) { item := &Item{ Job: *msg.MessageAttributes[job.RRJob].StringValue, + Ident: *msg.MessageAttributes[job.RRID].StringValue, Payload: *msg.Body, Headers: h, Options: &Options{ diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go index 82a0fa6c..b76ad0a3 100644 --- a/plugins/status/plugin.go +++ b/plugins/status/plugin.go @@ -85,7 +85,7 @@ func (c *Plugin) status(name string) (Status, error) { const op = errors.Op("checker_plugin_status") svc, ok := c.statusRegistry[name] if !ok { - return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) + return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Status(), nil @@ -96,7 +96,7 @@ func (c *Plugin) ready(name string) (Status, error) { const op = errors.Op("checker_plugin_ready") svc, ok := c.readyRegistry[name] if !ok { - return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) + return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Ready(), nil |