summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linux.yml2
-rwxr-xr-xMakefile4
-rw-r--r--common/jobs/interface.go2
-rw-r--r--pkg/priority_queue/binary_heap.go (renamed from pkg/priorityqueue/binary_heap.go)0
-rw-r--r--pkg/priority_queue/binary_heap_test.go (renamed from pkg/priorityqueue/binary_heap_test.go)0
-rw-r--r--pkg/priority_queue/interface.go (renamed from pkg/priorityqueue/interface.go)0
-rw-r--r--pkg/process/state.go8
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/informer/interface.go2
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go2
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go24
-rw-r--r--plugins/jobs/brokers/amqp/item.go99
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go4
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go4
-rw-r--r--plugins/jobs/plugin.go2
-rw-r--r--plugins/websockets/plugin.go4
-rw-r--r--proto/jobs/v1beta/jobs.pb.go5
-rw-r--r--proto/kv/v1beta/kv.pb.go5
-rw-r--r--proto/websockets/v1beta/websockets.pb.go5
-rw-r--r--tests/plugins/informer/test_plugin.go4
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml7
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go2
25 files changed, 104 insertions, 91 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 1d56e8a9..bb8191c1 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -73,7 +73,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priorityqueue
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
diff --git a/Makefile b/Makefile
index f02f8e2a..40a5aad2 100755
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priorityqueue
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config
@@ -48,7 +48,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
- go test -v -race -tags=debug ./pkg/priorityqueue
+ go test -v -race -tags=debug ./pkg/priority_queue
go test -v -race -tags=debug ./plugins/jobs/structs
go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index a75ba760..9c7ffef8 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -1,7 +1,7 @@
package jobs
import (
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index e47dd2c8..e47dd2c8 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index 06d0735c..06d0735c 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
diff --git a/pkg/priorityqueue/interface.go b/pkg/priority_queue/interface.go
index 8278dc8d..8278dc8d 100644
--- a/pkg/priorityqueue/interface.go
+++ b/pkg/priority_queue/interface.go
diff --git a/pkg/process/state.go b/pkg/process/state.go
index 652ec77c..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/process/state.go
@@ -32,20 +32,20 @@ type State struct {
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (State, error) {
+func WorkerProcessState(w worker.BaseProcess) (*State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return State{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
percent, err := p.CPUPercent()
if err != nil {
- return State{}, err
+ return nil, err
}
- return State{
+ return &State{
CPUPercent: percent,
Pid: int(w.Pid()),
Status: w.State().String(),
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index fb174792..2ee83384 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -285,13 +285,13 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 316c7bc1..d91ddf9d 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -11,7 +11,7 @@ Because Availabler implementation should present in every plugin
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []process.State
+ Workers() []*process.State
}
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index f8725ed7..c613af58 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -19,7 +19,7 @@ func (p *Plugin) Init() error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) []process.State {
+func (p *Plugin) Workers(name string) []*process.State {
svc, ok := p.withWorkers[name]
if !ok {
return nil
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index f096a0af..02254865 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -11,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []process.State `json:"workers"`
+ Workers []*process.State `json:"workers"`
}
// List all resettable services.
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 4a85ed01..2d0d591c 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -8,7 +8,7 @@ import (
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
err = jb.initRabbitMQ()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
jb.publishChan, err = jb.conn.Channel()
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
// run redialer for the connection
@@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// lock needed here to protect redial concurrent operation
// we may be in the redial state here
- j.RLock()
- defer j.RUnlock()
+ j.Lock()
+ defer j.Unlock()
// convert
msg := FromJob(job)
@@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
@@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return nil
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
- err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index e5e580e0..06e2bf56 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -12,13 +12,13 @@ import (
)
const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- // rrAttempt string = "rr-attempt"
- // rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
+ rrID string = "rr_id"
+ rrJob string = "rr_job"
+ rrHeaders string = "rr_headers"
+ rrPipeline string = "rr_pipeline"
+ rrTimeout string = "rr_timeout"
+ rrDelay string = "rr_delay"
+ rrRetryDelay string = "rr_retry_delay"
)
func FromDelivery(d amqp.Delivery) (*Item, error) {
@@ -44,12 +44,12 @@ func FromJob(job *structs.Job) *Item {
Ident: job.Ident,
Payload: job.Payload,
Options: &Options{
- Priority: job.Options.Priority,
+ Priority: uint32(job.Options.Priority),
Pipeline: job.Options.Pipeline,
- Delay: int64(job.Options.Delay),
- Attempts: int64(job.Options.Attempts),
- RetryDelay: int64(job.Options.RetryDelay),
- Timeout: int64(job.Options.Timeout),
+ Delay: int32(job.Options.Delay),
+ Attempts: int32(job.Options.Attempts),
+ RetryDelay: int32(job.Options.RetryDelay),
+ Timeout: int32(job.Options.Timeout),
},
}
}
@@ -84,27 +84,27 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority uint32 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
+ Delay int32 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts int64 `json:"maxAttempts,omitempty"`
+ Attempts int32 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int64 `json:"retryDelay,omitempty"`
+ RetryDelay int32 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int64 `json:"timeout,omitempty"`
+ Timeout int32 `json:"timeout,omitempty"`
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int64) bool {
+func (o *Options) CanRetry(attempt int32) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
@@ -133,7 +133,7 @@ func (j *Item) ID() string {
}
func (j *Item) Priority() uint64 {
- return j.Options.Priority
+ return uint64(j.Options.Priority)
}
// Body packs job payload into binary payload.
@@ -142,21 +142,9 @@ func (j *Item) Body() []byte {
}
// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
func (j *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Timeout int64 `json:"timeout"`
- Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
- )
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
+ return nil, nil
}
func (j *Item) Ack() error {
@@ -168,16 +156,20 @@ func (j *Item) Nack() error {
}
// pack job metadata into headers
-func pack(id string, j *Item) amqp.Table {
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- // rrAttempt: attempt,
- // rrMaxAttempts: j.Options.Attempts,
+ rrID: id,
+ rrJob: j.Job,
+ rrPipeline: j.Options.Pipeline,
+ rrHeaders: headers,
rrTimeout: j.Options.Timeout,
rrDelay: j.Options.Delay,
rrRetryDelay: j.Options.RetryDelay,
- }
+ }, nil
}
// unpack restores jobs.Options
@@ -188,30 +180,33 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
return "", nil, fmt.Errorf("missing header `%s`", rrID)
}
- // if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- // }
-
if _, ok := d.Headers[rrJob].(string); !ok {
return "", nil, fmt.Errorf("missing header `%s`", rrJob)
}
j.Job = d.Headers[rrJob].(string)
- // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- // }
+ if _, ok := d.Headers[rrPipeline].(string); ok {
+ j.Options.Pipeline = d.Headers[rrPipeline].(string)
+ }
+
+ if h, ok := d.Headers[rrHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &j.Headers)
+ if err != nil {
+ return "", nil, err
+ }
+ }
- if _, ok := d.Headers[rrTimeout].(int64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(int64)
+ if _, ok := d.Headers[rrTimeout].(int32); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(int32)
}
- if _, ok := d.Headers[rrDelay].(int64); ok {
- j.Options.Delay = d.Headers[rrDelay].(int64)
+ if _, ok := d.Headers[rrDelay].(int32); ok {
+ j.Options.Delay = d.Headers[rrDelay].(int32)
}
- if _, ok := d.Headers[rrRetryDelay].(int64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64)
+ if _, ok := d.Headers[rrRetryDelay].(int32); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
return d.Headers[rrID].(string), j, nil
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 74f9a174..377d8648 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -2,7 +2,7 @@ package amqp
import (
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -26,6 +26,8 @@ func (p *Plugin) Name() string {
return pluginName
}
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index f21fe8de..b5e73a9d 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -59,7 +59,7 @@ func (j *JobsConsumer) initRabbitMQ() error {
return errors.E(op, err)
}
- return nil
+ return channel.Close()
}
func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 09e78249..8f6f4b5f 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 60c6e245..9910d857 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -2,7 +2,7 @@ package ephemeral
import (
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -26,6 +26,8 @@ func (p *Plugin) Name() string {
return PluginName
}
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) {
return NewJobBroker(configKey, p.log, p.cfg, q)
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 2eb35f14..b7e41710 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -14,7 +14,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 5925a588..4420916c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -240,13 +240,13 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index 2b474de9..81d35bf8 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -7,10 +7,11 @@
package jobsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 19621735..1e38fe12 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -7,10 +7,11 @@
package kvv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index 188dcf08..b07c271e 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -7,10 +7,11 @@
package websocketsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 62816d02..095140b8 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -51,13 +51,13 @@ func (p1 *Plugin1) Name() string {
func (p1 *Plugin1) Available() {}
-func (p1 *Plugin1) Workers() []process.State {
+func (p1 *Plugin1) Workers() []*process.State {
p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
}
- ps := make([]process.State, 0, len(p.Workers()))
+ ps := make([]*process.State, 0, len(p.Workers()))
workers := p.Workers()
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 6ff2ab70..93c978c2 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -23,6 +23,9 @@ sqs:
declare:
MessageRetentionPeriod: 86400
+logs:
+ level: debug
+ mode: development
jobs:
# num logical cores by default
@@ -65,7 +68,7 @@ jobs:
test-2-amqp:
driver: amqp
priority: 2
- pipeline_size: 1000
+ pipeline_size: 100000
queue: test-2-queue
exchange: default
exchange_type: direct
@@ -85,5 +88,5 @@ jobs:
queue: default
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local", "test-local-2", "test-local-3", "test-1" ]
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index b2f05f0f..76b7b879 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -84,7 +84,7 @@ func TestJobsInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 120)
+ time.Sleep(time.Second * 1200)
stopCh <- struct{}{}