diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/plugin.go | 4 | ||||
-rw-r--r-- | plugins/informer/interface.go | 2 | ||||
-rw-r--r-- | plugins/informer/plugin.go | 2 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 24 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 99 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 4 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 4 |
12 files changed, 79 insertions, 72 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index fb174792..2ee83384 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -285,13 +285,13 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []process.State { +func (p *Plugin) Workers() []*process.State { p.RLock() defer p.RUnlock() workers := p.workers() - ps := make([]process.State, 0, len(workers)) + ps := make([]*process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) if err != nil { diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index 316c7bc1..d91ddf9d 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -11,7 +11,7 @@ Because Availabler implementation should present in every plugin // Informer used to get workers from particular plugin or set of plugins type Informer interface { - Workers() []process.State + Workers() []*process.State } // Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index f8725ed7..c613af58 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -19,7 +19,7 @@ func (p *Plugin) Init() error { } // Workers provides BaseProcess slice with workers for the requested plugin -func (p *Plugin) Workers(name string) []process.State { +func (p *Plugin) Workers(name string) []*process.State { svc, ok := p.withWorkers[name] if !ok { return nil diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index f096a0af..02254865 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -11,7 +11,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { // Workers is list of workers. - Workers []process.State `json:"workers"` + Workers []*process.State `json:"workers"` } // List all resettable services. diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 4a85ed01..2d0d591c 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, err = jb.initRabbitMQ() if err != nil { - return nil, err + return nil, errors.E(op, err) } jb.publishChan, err = jb.conn.Channel() if err != nil { - panic(err) + return nil, errors.E(op, err) } // run redialer for the connection @@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // lock needed here to protect redial concurrent operation // we may be in the redial state here - j.RLock() - defer j.RUnlock() + j.Lock() + defer j.Unlock() // convert msg := FromJob(job) @@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, @@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return nil } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline - err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index e5e580e0..06e2bf56 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -12,13 +12,13 @@ import ( ) const ( - rrID string = "rr-id" - rrJob string = "rr-job" - // rrAttempt string = "rr-attempt" - // rrMaxAttempts string = "rr-max_attempts" - rrTimeout string = "rr-timeout" - rrDelay string = "rr-delay" - rrRetryDelay string = "rr-retry_delay" + rrID string = "rr_id" + rrJob string = "rr_job" + rrHeaders string = "rr_headers" + rrPipeline string = "rr_pipeline" + rrTimeout string = "rr_timeout" + rrDelay string = "rr_delay" + rrRetryDelay string = "rr_retry_delay" ) func FromDelivery(d amqp.Delivery) (*Item, error) { @@ -44,12 +44,12 @@ func FromJob(job *structs.Job) *Item { Ident: job.Ident, Payload: job.Payload, Options: &Options{ - Priority: job.Options.Priority, + Priority: uint32(job.Options.Priority), Pipeline: job.Options.Pipeline, - Delay: int64(job.Options.Delay), - Attempts: int64(job.Options.Attempts), - RetryDelay: int64(job.Options.RetryDelay), - Timeout: int64(job.Options.Timeout), + Delay: int32(job.Options.Delay), + Attempts: int32(job.Options.Attempts), + RetryDelay: int32(job.Options.RetryDelay), + Timeout: int32(job.Options.Timeout), }, } } @@ -84,27 +84,27 @@ type Item struct { type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set - Priority uint64 `json:"priority"` + Priority uint32 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` + Delay int32 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts int64 `json:"maxAttempts,omitempty"` + Attempts int32 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int64 `json:"retryDelay,omitempty"` + RetryDelay int32 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int64 `json:"timeout,omitempty"` + Timeout int32 `json:"timeout,omitempty"` } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int64) bool { +func (o *Options) CanRetry(attempt int32) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } @@ -133,7 +133,7 @@ func (j *Item) ID() string { } func (j *Item) Priority() uint64 { - return j.Options.Priority + return uint64(j.Options.Priority) } // Body packs job payload into binary payload. @@ -142,21 +142,9 @@ func (j *Item) Body() []byte { } // Context packs job context (job, id) into binary payload. +// Not used in the amqp, amqp.Table used instead func (j *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Timeout int64 `json:"timeout"` - Pipeline string `json:"pipeline"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, - ) - if err != nil { - return nil, err - } - - return ctx, nil + return nil, nil } func (j *Item) Ack() error { @@ -168,16 +156,20 @@ func (j *Item) Nack() error { } // pack job metadata into headers -func pack(id string, j *Item) amqp.Table { +func pack(id string, j *Item) (amqp.Table, error) { + headers, err := json.Marshal(j.Headers) + if err != nil { + return nil, err + } return amqp.Table{ - rrID: id, - rrJob: j.Job, - // rrAttempt: attempt, - // rrMaxAttempts: j.Options.Attempts, + rrID: id, + rrJob: j.Job, + rrPipeline: j.Options.Pipeline, + rrHeaders: headers, rrTimeout: j.Options.Timeout, rrDelay: j.Options.Delay, rrRetryDelay: j.Options.RetryDelay, - } + }, nil } // unpack restores jobs.Options @@ -188,30 +180,33 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { return "", nil, fmt.Errorf("missing header `%s`", rrID) } - // if _, ok := d.Headers[rrAttempt].(uint64); !ok { - // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) - // } - if _, ok := d.Headers[rrJob].(string); !ok { return "", nil, fmt.Errorf("missing header `%s`", rrJob) } j.Job = d.Headers[rrJob].(string) - // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { - // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) - // } + if _, ok := d.Headers[rrPipeline].(string); ok { + j.Options.Pipeline = d.Headers[rrPipeline].(string) + } + + if h, ok := d.Headers[rrHeaders].([]byte); ok { + err := json.Unmarshal(h, &j.Headers) + if err != nil { + return "", nil, err + } + } - if _, ok := d.Headers[rrTimeout].(int64); ok { - j.Options.Timeout = d.Headers[rrTimeout].(int64) + if _, ok := d.Headers[rrTimeout].(int32); ok { + j.Options.Timeout = d.Headers[rrTimeout].(int32) } - if _, ok := d.Headers[rrDelay].(int64); ok { - j.Options.Delay = d.Headers[rrDelay].(int64) + if _, ok := d.Headers[rrDelay].(int32); ok { + j.Options.Delay = d.Headers[rrDelay].(int32) } - if _, ok := d.Headers[rrRetryDelay].(int64); ok { - j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64) + if _, ok := d.Headers[rrRetryDelay].(int32); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32) } return d.Headers[rrID].(string), j, nil diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 74f9a174..377d8648 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -2,7 +2,7 @@ package amqp import ( "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -26,6 +26,8 @@ func (p *Plugin) Name() string { return pluginName } +func (p *Plugin) Available() {} + func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { return NewAMQPConsumer(configKey, p.log, p.cfg, pq) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index f21fe8de..b5e73a9d 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -59,7 +59,7 @@ func (j *JobsConsumer) initRabbitMQ() error { return errors.E(op, err) } - return nil + return channel.Close() } func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 09e78249..8f6f4b5f 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -5,7 +5,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 60c6e245..9910d857 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,7 +2,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -26,6 +26,8 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Available() {} + func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(configKey, p.log, p.cfg, q) } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 2eb35f14..b7e41710 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -14,7 +14,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 5925a588..4420916c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -240,13 +240,13 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { } // Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []process.State { +func (p *Plugin) Workers() []*process.State { p.RLock() defer p.RUnlock() workers := p.workers() - ps := make([]process.State, 0, len(workers)) + ps := make([]*process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) if err != nil { |