summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/item.go41
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go4
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go24
-rw-r--r--plugins/jobs/drivers/sqs/item.go4
-rw-r--r--plugins/jobs/plugin.go31
-rw-r--r--plugins/jobs/protocol.go67
6 files changed, 127 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b544620..a7e2c4e5 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -70,22 +70,22 @@ func (o *Options) TimeoutDuration() time.Duration {
return time.Second * time.Duration(o.Timeout)
}
-func (j *Item) ID() string {
- return j.Ident
+func (i *Item) ID() string {
+ return i.Ident
}
-func (j *Item) Priority() int64 {
- return j.Options.Priority
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
}
// Body packs job payload into binary payload.
-func (j *Item) Body() []byte {
- return utils.AsBytes(j.Payload)
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
}
// Context packs job context (job, id) into binary payload.
// Not used in the amqp, amqp.Table used instead
-func (j *Item) Context() ([]byte, error) {
+func (i *Item) Context() ([]byte, error) {
ctx, err := json.Marshal(
struct {
ID string `json:"id"`
@@ -93,7 +93,7 @@ func (j *Item) Context() ([]byte, error) {
Headers map[string][]string `json:"headers"`
Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
)
if err != nil {
@@ -103,14 +103,20 @@ func (j *Item) Context() ([]byte, error) {
return ctx, nil
}
-func (j *Item) Ack() error {
- return j.AckFunc(j.Options.multipleAsk)
+func (i *Item) Ack() error {
+ return i.AckFunc(i.Options.multipleAsk)
}
-func (j *Item) Nack() error {
- return j.NackFunc(false, j.Options.requeue)
+func (i *Item) Nack() error {
+ return i.NackFunc(false, i.Options.requeue)
}
+// Requeue with the provided delay, handled by the Nack
+func (i *Item) Requeue(_ uint32) error {
+ return nil
+}
+
+// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
item, err := j.unpack(d)
@@ -118,11 +124,12 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
return nil, errors.E(op, err)
}
return &Item{
- Job: item.Job,
- Ident: item.Ident,
- Payload: item.Payload,
- Headers: item.Headers,
- Options: item.Options,
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ // internal
AckFunc: d.Ack,
NackFunc: d.Nack,
}, nil
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index fc0ee86b..eb532e08 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -103,6 +103,10 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
+func (i *Item) Requeue(_ uint32) error {
+ return nil
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 442533c5..ebf16524 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -69,21 +69,21 @@ func (o *Options) TimeoutDuration() time.Duration {
return time.Second * time.Duration(o.Timeout)
}
-func (j *Item) ID() string {
- return j.Ident
+func (i *Item) ID() string {
+ return i.Ident
}
-func (j *Item) Priority() int64 {
- return j.Options.Priority
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
}
// Body packs job payload into binary payload.
-func (j *Item) Body() []byte {
- return utils.AsBytes(j.Payload)
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
}
// Context packs job context (job, id) into binary payload.
-func (j *Item) Context() ([]byte, error) {
+func (i *Item) Context() ([]byte, error) {
ctx, err := json.Marshal(
struct {
ID string `json:"id"`
@@ -91,7 +91,7 @@ func (j *Item) Context() ([]byte, error) {
Headers map[string][]string `json:"headers"`
Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
)
if err != nil {
@@ -101,12 +101,16 @@ func (j *Item) Context() ([]byte, error) {
return ctx, nil
}
-func (j *Item) Ack() error {
+func (i *Item) Ack() error {
// noop for the in-memory
return nil
}
-func (j *Item) Nack() error {
+func (i *Item) Nack() error {
// noop for the in-memory
return nil
}
+
+func (i *Item) Requeue(_ uint32) error {
+ return nil
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 325c4781..50d8ac18 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -156,6 +156,10 @@ func (i *Item) Nack() error {
return nil
}
+func (i *Item) Requeue(_ uint32) error {
+ return nil
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6c848a9d..61936db2 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -18,7 +18,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -81,6 +80,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
+
p.pldPool = sync.Pool{New: func() interface{} {
// with nil fields
return &payload.Payload{}
@@ -104,8 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload() *payload.Payload {
- return p.pldPool.Get().(*payload.Payload)
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
}
func (p *Plugin) putPayload(pld *payload.Payload) {
@@ -219,13 +222,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
// get payload from the sync.Pool
- exec := p.getPayload()
- exec.Body = jb.Body()
- exec.Context = ctx
-
- // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
- // remove in tests
- p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+ exec := p.getPayload(jb.Body(), ctx)
// protect from the pool reset
p.RLock()
@@ -243,14 +240,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
- // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
- // remove in tests
- p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
-
- errAck := jb.Ack()
- if errAck != nil {
- p.log.Error("acknowledge failed", "error", errAck)
+ // handle the response protocol
+ err = handleResponse(resp.Body, jb, p.log)
+ if err != nil {
p.putPayload(exec)
+ errNack := jb.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed, job might be lost", "error", errNack)
+ }
continue
}
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
new file mode 100644
index 00000000..adfd0784
--- /dev/null
+++ b/plugins/jobs/protocol.go
@@ -0,0 +1,67 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ pq "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Type uint32
+
+const (
+ Error Type = iota
+ NoError
+)
+
+// internal worker protocol (jobs mode)
+type protocol struct {
+ // message type, see Type
+ T Type `json:"type"`
+ // Payload
+ Data []byte `json:"data"`
+}
+
+type errorResp struct {
+ Msg string `json:"message"`
+ Requeue bool `json:"requeue"`
+ Delay uint32 `json:"delay_seconds"`
+}
+
+func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
+ const op = errors.Op("jobs_handle_response")
+ // TODO(rustatian) to sync.Pool
+ p := &protocol{}
+ err := json.Unmarshal(resp, p)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ switch p.T {
+ case Error:
+ // TODO(rustatian) to sync.Pool
+ er := &errorResp{}
+ err = json.Unmarshal(p.Data, er)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+
+ if er.Requeue {
+ err = jb.Requeue(er.Delay)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ case NoError:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}