summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 10:11:22 +0300
committerValery Piashchynski <[email protected]>2021-07-11 10:11:22 +0300
commit589f759cc2411319adbca2ece0dbe212407d1eba (patch)
tree2fbea4a6033d65813c41fd80f6339a524b46c9b2 /plugins
parentcb2665d93ad7abe1ab30508ff0e2bd4d0bc379ea (diff)
Update informer interface to return slice of pointers (do not over-copy
the Stat structure). Make amqp Push concurrent safe. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/informer/interface.go2
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go2
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go24
-rw-r--r--plugins/jobs/brokers/amqp/item.go99
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go4
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go4
-rw-r--r--plugins/jobs/plugin.go2
-rw-r--r--plugins/websockets/plugin.go4
12 files changed, 79 insertions, 72 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index fb174792..2ee83384 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -285,13 +285,13 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 316c7bc1..d91ddf9d 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -11,7 +11,7 @@ Because Availabler implementation should present in every plugin
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []process.State
+ Workers() []*process.State
}
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index f8725ed7..c613af58 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -19,7 +19,7 @@ func (p *Plugin) Init() error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) []process.State {
+func (p *Plugin) Workers(name string) []*process.State {
svc, ok := p.withWorkers[name]
if !ok {
return nil
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index f096a0af..02254865 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -11,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []process.State `json:"workers"`
+ Workers []*process.State `json:"workers"`
}
// List all resettable services.
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 4a85ed01..2d0d591c 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -8,7 +8,7 @@ import (
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
err = jb.initRabbitMQ()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
jb.publishChan, err = jb.conn.Channel()
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
// run redialer for the connection
@@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// lock needed here to protect redial concurrent operation
// we may be in the redial state here
- j.RLock()
- defer j.RUnlock()
+ j.Lock()
+ defer j.Unlock()
// convert
msg := FromJob(job)
@@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
@@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return nil
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
- err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index e5e580e0..06e2bf56 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -12,13 +12,13 @@ import (
)
const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- // rrAttempt string = "rr-attempt"
- // rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
+ rrID string = "rr_id"
+ rrJob string = "rr_job"
+ rrHeaders string = "rr_headers"
+ rrPipeline string = "rr_pipeline"
+ rrTimeout string = "rr_timeout"
+ rrDelay string = "rr_delay"
+ rrRetryDelay string = "rr_retry_delay"
)
func FromDelivery(d amqp.Delivery) (*Item, error) {
@@ -44,12 +44,12 @@ func FromJob(job *structs.Job) *Item {
Ident: job.Ident,
Payload: job.Payload,
Options: &Options{
- Priority: job.Options.Priority,
+ Priority: uint32(job.Options.Priority),
Pipeline: job.Options.Pipeline,
- Delay: int64(job.Options.Delay),
- Attempts: int64(job.Options.Attempts),
- RetryDelay: int64(job.Options.RetryDelay),
- Timeout: int64(job.Options.Timeout),
+ Delay: int32(job.Options.Delay),
+ Attempts: int32(job.Options.Attempts),
+ RetryDelay: int32(job.Options.RetryDelay),
+ Timeout: int32(job.Options.Timeout),
},
}
}
@@ -84,27 +84,27 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority uint32 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
+ Delay int32 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts int64 `json:"maxAttempts,omitempty"`
+ Attempts int32 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int64 `json:"retryDelay,omitempty"`
+ RetryDelay int32 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int64 `json:"timeout,omitempty"`
+ Timeout int32 `json:"timeout,omitempty"`
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int64) bool {
+func (o *Options) CanRetry(attempt int32) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
@@ -133,7 +133,7 @@ func (j *Item) ID() string {
}
func (j *Item) Priority() uint64 {
- return j.Options.Priority
+ return uint64(j.Options.Priority)
}
// Body packs job payload into binary payload.
@@ -142,21 +142,9 @@ func (j *Item) Body() []byte {
}
// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
func (j *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Timeout int64 `json:"timeout"`
- Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
- )
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
+ return nil, nil
}
func (j *Item) Ack() error {
@@ -168,16 +156,20 @@ func (j *Item) Nack() error {
}
// pack job metadata into headers
-func pack(id string, j *Item) amqp.Table {
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- // rrAttempt: attempt,
- // rrMaxAttempts: j.Options.Attempts,
+ rrID: id,
+ rrJob: j.Job,
+ rrPipeline: j.Options.Pipeline,
+ rrHeaders: headers,
rrTimeout: j.Options.Timeout,
rrDelay: j.Options.Delay,
rrRetryDelay: j.Options.RetryDelay,
- }
+ }, nil
}
// unpack restores jobs.Options
@@ -188,30 +180,33 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
return "", nil, fmt.Errorf("missing header `%s`", rrID)
}
- // if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- // }
-
if _, ok := d.Headers[rrJob].(string); !ok {
return "", nil, fmt.Errorf("missing header `%s`", rrJob)
}
j.Job = d.Headers[rrJob].(string)
- // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- // }
+ if _, ok := d.Headers[rrPipeline].(string); ok {
+ j.Options.Pipeline = d.Headers[rrPipeline].(string)
+ }
+
+ if h, ok := d.Headers[rrHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &j.Headers)
+ if err != nil {
+ return "", nil, err
+ }
+ }
- if _, ok := d.Headers[rrTimeout].(int64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(int64)
+ if _, ok := d.Headers[rrTimeout].(int32); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(int32)
}
- if _, ok := d.Headers[rrDelay].(int64); ok {
- j.Options.Delay = d.Headers[rrDelay].(int64)
+ if _, ok := d.Headers[rrDelay].(int32); ok {
+ j.Options.Delay = d.Headers[rrDelay].(int32)
}
- if _, ok := d.Headers[rrRetryDelay].(int64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64)
+ if _, ok := d.Headers[rrRetryDelay].(int32); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
return d.Headers[rrID].(string), j, nil
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 74f9a174..377d8648 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -2,7 +2,7 @@ package amqp
import (
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -26,6 +26,8 @@ func (p *Plugin) Name() string {
return pluginName
}
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index f21fe8de..b5e73a9d 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -59,7 +59,7 @@ func (j *JobsConsumer) initRabbitMQ() error {
return errors.E(op, err)
}
- return nil
+ return channel.Close()
}
func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 09e78249..8f6f4b5f 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 60c6e245..9910d857 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -2,7 +2,7 @@ package ephemeral
import (
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -26,6 +26,8 @@ func (p *Plugin) Name() string {
return PluginName
}
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) {
return NewJobBroker(configKey, p.log, p.cfg, q)
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 2eb35f14..b7e41710 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -14,7 +14,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 5925a588..4420916c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -240,13 +240,13 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {