diff options
25 files changed, 104 insertions, 91 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 1d56e8a9..bb8191c1 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -73,7 +73,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priorityqueue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server @@ -14,7 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priorityqueue + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config @@ -48,7 +48,7 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst - go test -v -race -tags=debug ./pkg/priorityqueue + go test -v -race -tags=debug ./pkg/priority_queue go test -v -race -tags=debug ./plugins/jobs/structs go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config diff --git a/common/jobs/interface.go b/common/jobs/interface.go index a75ba760..9c7ffef8 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -1,7 +1,7 @@ package jobs import ( - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priority_queue/binary_heap.go index e47dd2c8..e47dd2c8 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priority_queue/binary_heap.go diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go index 06d0735c..06d0735c 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priority_queue/binary_heap_test.go diff --git a/pkg/priorityqueue/interface.go b/pkg/priority_queue/interface.go index 8278dc8d..8278dc8d 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priority_queue/interface.go diff --git a/pkg/process/state.go b/pkg/process/state.go index 652ec77c..bfc3a287 100644 --- a/pkg/process/state.go +++ b/pkg/process/state.go @@ -32,20 +32,20 @@ type State struct { } // WorkerProcessState creates new worker state definition. -func WorkerProcessState(w worker.BaseProcess) (State, error) { +func WorkerProcessState(w worker.BaseProcess) (*State, error) { const op = errors.Op("worker_process_state") p, _ := process.NewProcess(int32(w.Pid())) i, err := p.MemoryInfo() if err != nil { - return State{}, errors.E(op, err) + return nil, errors.E(op, err) } percent, err := p.CPUPercent() if err != nil { - return State{}, err + return nil, err } - return State{ + return &State{ CPUPercent: percent, Pid: int(w.Pid()), Status: w.State().String(), diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index fb174792..2ee83384 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -285,13 +285,13 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []process.State { +func (p *Plugin) Workers() []*process.State { p.RLock() defer p.RUnlock() workers := p.workers() - ps := make([]process.State, 0, len(workers)) + ps := make([]*process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) if err != nil { diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index 316c7bc1..d91ddf9d 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -11,7 +11,7 @@ Because Availabler implementation should present in every plugin // Informer used to get workers from particular plugin or set of plugins type Informer interface { - Workers() []process.State + Workers() []*process.State } // Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index f8725ed7..c613af58 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -19,7 +19,7 @@ func (p *Plugin) Init() error { } // Workers provides BaseProcess slice with workers for the requested plugin -func (p *Plugin) Workers(name string) []process.State { +func (p *Plugin) Workers(name string) []*process.State { svc, ok := p.withWorkers[name] if !ok { return nil diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index f096a0af..02254865 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -11,7 +11,7 @@ type rpc struct { // WorkerList contains list of workers. type WorkerList struct { // Workers is list of workers. - Workers []process.State `json:"workers"` + Workers []*process.State `json:"workers"` } // List all resettable services. diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 4a85ed01..2d0d591c 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, err = jb.initRabbitMQ() if err != nil { - return nil, err + return nil, errors.E(op, err) } jb.publishChan, err = jb.conn.Channel() if err != nil { - panic(err) + return nil, errors.E(op, err) } // run redialer for the connection @@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // lock needed here to protect redial concurrent operation // we may be in the redial state here - j.RLock() - defer j.RUnlock() + j.Lock() + defer j.Unlock() // convert msg := FromJob(job) @@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, @@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return nil } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline - err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index e5e580e0..06e2bf56 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -12,13 +12,13 @@ import ( ) const ( - rrID string = "rr-id" - rrJob string = "rr-job" - // rrAttempt string = "rr-attempt" - // rrMaxAttempts string = "rr-max_attempts" - rrTimeout string = "rr-timeout" - rrDelay string = "rr-delay" - rrRetryDelay string = "rr-retry_delay" + rrID string = "rr_id" + rrJob string = "rr_job" + rrHeaders string = "rr_headers" + rrPipeline string = "rr_pipeline" + rrTimeout string = "rr_timeout" + rrDelay string = "rr_delay" + rrRetryDelay string = "rr_retry_delay" ) func FromDelivery(d amqp.Delivery) (*Item, error) { @@ -44,12 +44,12 @@ func FromJob(job *structs.Job) *Item { Ident: job.Ident, Payload: job.Payload, Options: &Options{ - Priority: job.Options.Priority, + Priority: uint32(job.Options.Priority), Pipeline: job.Options.Pipeline, - Delay: int64(job.Options.Delay), - Attempts: int64(job.Options.Attempts), - RetryDelay: int64(job.Options.RetryDelay), - Timeout: int64(job.Options.Timeout), + Delay: int32(job.Options.Delay), + Attempts: int32(job.Options.Attempts), + RetryDelay: int32(job.Options.RetryDelay), + Timeout: int32(job.Options.Timeout), }, } } @@ -84,27 +84,27 @@ type Item struct { type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set - Priority uint64 `json:"priority"` + Priority uint32 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` + Delay int32 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts int64 `json:"maxAttempts,omitempty"` + Attempts int32 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int64 `json:"retryDelay,omitempty"` + RetryDelay int32 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int64 `json:"timeout,omitempty"` + Timeout int32 `json:"timeout,omitempty"` } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int64) bool { +func (o *Options) CanRetry(attempt int32) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } @@ -133,7 +133,7 @@ func (j *Item) ID() string { } func (j *Item) Priority() uint64 { - return j.Options.Priority + return uint64(j.Options.Priority) } // Body packs job payload into binary payload. @@ -142,21 +142,9 @@ func (j *Item) Body() []byte { } // Context packs job context (job, id) into binary payload. +// Not used in the amqp, amqp.Table used instead func (j *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Timeout int64 `json:"timeout"` - Pipeline string `json:"pipeline"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, - ) - if err != nil { - return nil, err - } - - return ctx, nil + return nil, nil } func (j *Item) Ack() error { @@ -168,16 +156,20 @@ func (j *Item) Nack() error { } // pack job metadata into headers -func pack(id string, j *Item) amqp.Table { +func pack(id string, j *Item) (amqp.Table, error) { + headers, err := json.Marshal(j.Headers) + if err != nil { + return nil, err + } return amqp.Table{ - rrID: id, - rrJob: j.Job, - // rrAttempt: attempt, - // rrMaxAttempts: j.Options.Attempts, + rrID: id, + rrJob: j.Job, + rrPipeline: j.Options.Pipeline, + rrHeaders: headers, rrTimeout: j.Options.Timeout, rrDelay: j.Options.Delay, rrRetryDelay: j.Options.RetryDelay, - } + }, nil } // unpack restores jobs.Options @@ -188,30 +180,33 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { return "", nil, fmt.Errorf("missing header `%s`", rrID) } - // if _, ok := d.Headers[rrAttempt].(uint64); !ok { - // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) - // } - if _, ok := d.Headers[rrJob].(string); !ok { return "", nil, fmt.Errorf("missing header `%s`", rrJob) } j.Job = d.Headers[rrJob].(string) - // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { - // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) - // } + if _, ok := d.Headers[rrPipeline].(string); ok { + j.Options.Pipeline = d.Headers[rrPipeline].(string) + } + + if h, ok := d.Headers[rrHeaders].([]byte); ok { + err := json.Unmarshal(h, &j.Headers) + if err != nil { + return "", nil, err + } + } - if _, ok := d.Headers[rrTimeout].(int64); ok { - j.Options.Timeout = d.Headers[rrTimeout].(int64) + if _, ok := d.Headers[rrTimeout].(int32); ok { + j.Options.Timeout = d.Headers[rrTimeout].(int32) } - if _, ok := d.Headers[rrDelay].(int64); ok { - j.Options.Delay = d.Headers[rrDelay].(int64) + if _, ok := d.Headers[rrDelay].(int32); ok { + j.Options.Delay = d.Headers[rrDelay].(int32) } - if _, ok := d.Headers[rrRetryDelay].(int64); ok { - j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64) + if _, ok := d.Headers[rrRetryDelay].(int32); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32) } return d.Headers[rrID].(string), j, nil diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 74f9a174..377d8648 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -2,7 +2,7 @@ package amqp import ( "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -26,6 +26,8 @@ func (p *Plugin) Name() string { return pluginName } +func (p *Plugin) Available() {} + func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { return NewAMQPConsumer(configKey, p.log, p.cfg, pq) } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index f21fe8de..b5e73a9d 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -59,7 +59,7 @@ func (j *JobsConsumer) initRabbitMQ() error { return errors.E(op, err) } - return nil + return channel.Close() } func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 09e78249..8f6f4b5f 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -5,7 +5,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 60c6e245..9910d857 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,7 +2,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -26,6 +26,8 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Available() {} + func (p *Plugin) JobsConstruct(configKey string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(configKey, p.log, p.cfg, q) } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 2eb35f14..b7e41710 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -14,7 +14,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 5925a588..4420916c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -240,13 +240,13 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { } // Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []process.State { +func (p *Plugin) Workers() []*process.State { p.RLock() defer p.RUnlock() workers := p.workers() - ps := make([]process.State, 0, len(workers)) + ps := make([]*process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) if err != nil { diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index 2b474de9..81d35bf8 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -7,10 +7,11 @@ package jobsv1beta import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 19621735..1e38fe12 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -7,10 +7,11 @@ package kvv1beta import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index 188dcf08..b07c271e 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -7,10 +7,11 @@ package websocketsv1beta import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 62816d02..095140b8 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -51,13 +51,13 @@ func (p1 *Plugin1) Name() string { func (p1 *Plugin1) Available() {} -func (p1 *Plugin1) Workers() []process.State { +func (p1 *Plugin1) Workers() []*process.State { p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) } - ps := make([]process.State, 0, len(p.Workers())) + ps := make([]*process.State, 0, len(p.Workers())) workers := p.Workers() for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index 6ff2ab70..93c978c2 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -23,6 +23,9 @@ sqs: declare: MessageRetentionPeriod: 86400 +logs: + level: debug + mode: development jobs: # num logical cores by default @@ -65,7 +68,7 @@ jobs: test-2-amqp: driver: amqp priority: 2 - pipeline_size: 1000 + pipeline_size: 100000 queue: test-2-queue exchange: default exchange_type: direct @@ -85,5 +88,5 @@ jobs: queue: default # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1" ] + consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index b2f05f0f..76b7b879 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -84,7 +84,7 @@ func TestJobsInit(t *testing.T) { } }() - time.Sleep(time.Second * 120) + time.Sleep(time.Second * 1200) stopCh <- struct{}{} |