diff options
24 files changed, 184 insertions, 147 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 85ffce88..cf6c037b 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -77,6 +77,8 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/structs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload @@ -15,9 +15,11 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server_cmd.out -covermode=atomic ./plugins/server + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server.out -covermode=atomic ./tests/plugins/server @@ -47,9 +49,11 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./pkg/priority_queue - go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./plugins/jobs/structs + go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server + go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload go test -v -race -tags=debug ./tests/plugins/server diff --git a/common/priority_queue/interface.go b/common/priority_queue/interface.go new file mode 100644 index 00000000..c1774223 --- /dev/null +++ b/common/priority_queue/interface.go @@ -0,0 +1,15 @@ +package priorityqueue + +type Queue interface { + Insert(item Item) + GetMax() Item +} + +type Item interface { + ID() string + Priority() uint64 + Ack() + Nack() + Body() []byte + Context() []byte +} diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go index 02d413aa..c7c148da 100644 --- a/pkg/priority_queue/binary_heap.go +++ b/pkg/priority_queue/binary_heap.go @@ -4,7 +4,9 @@ binary heap (min-heap) algorithm used as a core for the priority queue package priorityqueue -type BinHeap []PQItem +import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + +type BinHeap []priorityqueue.Item func NewBinHeap() *BinHeap { return &BinHeap{} @@ -53,12 +55,12 @@ func (bh *BinHeap) fixDown(curr, end int) { } } -func (bh *BinHeap) Insert(item PQItem) { +func (bh *BinHeap) Insert(item priorityqueue.Item) { *bh = append(*bh, item) bh.fixUp() } -func (bh *BinHeap) GetMax() PQItem { +func (bh *BinHeap) GetMax() priorityqueue.Item { l := len(*bh) if l == 0 { return nil diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go index afeae62c..528e8fd0 100644 --- a/pkg/priority_queue/binary_heap_test.go +++ b/pkg/priority_queue/binary_heap_test.go @@ -3,11 +3,26 @@ package priorityqueue import ( "testing" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/stretchr/testify/require" ) type Test int +func (t Test) Ack() { +} + +func (t Test) Nack() { +} + +func (t Test) Body() []byte { + return nil +} + +func (t Test) Context() []byte { + return nil +} + func (t Test) ID() string { return "" } @@ -17,7 +32,7 @@ func (t Test) Priority() uint64 { } func TestBinHeap_Init(t *testing.T) { - a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} bh := NewBinHeap() @@ -25,9 +40,9 @@ func TestBinHeap_Init(t *testing.T) { bh.Insert(a[i]) } - expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - res := make([]PQItem, 0, 12) + res := make([]priorityqueue.Item, 0, 12) for i := 0; i < 11; i++ { item := bh.GetMax() diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go deleted file mode 100644 index 3cc1d575..00000000 --- a/pkg/priority_queue/interface.go +++ /dev/null @@ -1,11 +0,0 @@ -package priorityqueue - -type Queue interface { - Insert(item PQItem) - GetMax() PQItem -} - -type PQItem interface { - ID() string - Priority() uint64 -} diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go index 1b33cb92..2ff52a79 100644 --- a/pkg/priority_queue/pq.go +++ b/pkg/priority_queue/pq.go @@ -1,6 +1,10 @@ package priorityqueue -import "sync" +import ( + "sync" + + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" +) type PQ struct { sync.RWMutex @@ -13,14 +17,14 @@ func NewPriorityQueue() *PQ { } } -func (p *PQ) Insert(item PQItem) { +func (p *PQ) GetMax() priorityqueue.Item { p.Lock() - p.bh.Insert(item) - p.Unlock() + defer p.Unlock() + return p.bh.GetMax() } -func (p *PQ) Get() PQItem { +func (p *PQ) Insert(item priorityqueue.Item) { p.Lock() - defer p.Unlock() - return p.bh.GetMax() + p.bh.Insert(item) + p.Unlock() } diff --git a/pkg/priority_queue/pq_test.go b/pkg/priority_queue/pq_test.go index cdec10f5..49afe5e3 100644 --- a/pkg/priority_queue/pq_test.go +++ b/pkg/priority_queue/pq_test.go @@ -37,7 +37,7 @@ func TestNewPriorityQueue(t *testing.T) { case <-stopCh: return default: - it := pq.Get() + it := pq.GetMax() if it == nil { continue } diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 4d357c34..3eb20c27 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -3,60 +3,68 @@ package ephemeral import ( "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { - jobs chan *entry - queues map[*pipeline.Pipeline]*queue + queues map[string]bool pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - jobs: make(chan *entry, 10), - pq: q, + queues: make(map[string]bool), + pq: q, } - go jb.serve() - return jb, nil } -func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { - id := uuid.NewString() +func (j *JobBroker) Push(job *structs.Job) (string, error) { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.queues[job.Options.Pipeline]; ok { + if !b { + return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + } + if job.Options.Priority == nil { + job.Options.Priority = intPtr(10) + } + job.Options.ID = uuid.NewString() + + j.pq.Insert(job) - j.jobs <- &entry{ - id: id, + return job.Options.ID, nil } - return id, nil + return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Stat() { panic("implement me") } -func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { +func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; !ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + if _, ok := j.queues[pipeline]; ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = newQueue() + j.queues[pipeline] = true return nil } -func (j *JobBroker) serve() { - for item := range j.jobs { - // item should satisfy - j.pq.Insert(item) +func intPtr(val uint64) *uint64 { + if val == 0 { + val = 10 } + return &val } diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go deleted file mode 100644 index 847b63ea..00000000 --- a/plugins/jobs/brokers/ephemeral/config.go +++ /dev/null @@ -1 +0,0 @@ -package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go deleted file mode 100644 index 3cedec3e..00000000 --- a/plugins/jobs/brokers/ephemeral/entry.go +++ /dev/null @@ -1,26 +0,0 @@ -package ephemeral - -type entry struct { - id string - priority uint64 -} - -func (e *entry) ID() string { - return e.id -} - -func (e *entry) Priority() uint64 { - return e.priority -} - -func (e *entry) Ask() { - // no-op -} - -func (e *entry) Nack() { - // no-op -} - -func (e *entry) Payload() []byte { - panic("implement me") -} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 84cc871b..146d1fdc 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,7 +1,7 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go deleted file mode 100644 index 1c6d865b..00000000 --- a/plugins/jobs/brokers/ephemeral/queue.go +++ /dev/null @@ -1,7 +0,0 @@ -package ephemeral - -type queue struct{} - -func newQueue() *queue { - return &queue{} -} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 87e36ecb..1cb2c2a2 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -9,19 +9,19 @@ import ( type Config struct { // Workers configures roadrunner server and worker busy. // Workers *roadrunner.ServerConfig - poolCfg *poolImpl.Config + Pool *poolImpl.Config `mapstructure:"Pool"` // Pipelines defines mapping between PHP job pipeline and associated job broker. - Pipelines map[string]*pipeline.Pipeline + Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"` // Consuming specifies names of pipelines to be consumed on service start. - Consume []string + Consume []string `mapstructure:"consume"` } func (c *Config) InitDefaults() { - if c.poolCfg == nil { - c.poolCfg = &poolImpl.Config{} + if c.Pool == nil { + c.Pool = &poolImpl.Config{} } - c.poolCfg.InitDefaults() + c.Pool.InitDefaults() } diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index a0aed50b..a2cf6ed9 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -1,17 +1,17 @@ package jobs import ( - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) // Consumer todo naming type Consumer interface { - Push(*pipeline.Pipeline, *structs.Job) (string, error) + Push(*structs.Job) (string, error) Stat() Consume(*pipeline.Pipeline) - Register(*pipeline.Pipeline) error + Register(pipe string) error } type Broker interface { @@ -20,7 +20,5 @@ type Broker interface { type Item interface { ID() string - Ask() - Nack() Payload() []byte } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index f27f6ede..987f6826 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -15,7 +15,7 @@ func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { out := make(Pipelines, 0) for name, pipe := range pipes { - if pipe.Broker() == "" { + if pipe.Driver() == "" { return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) } @@ -42,7 +42,7 @@ func (ps Pipelines) Broker(broker string) Pipelines { out := make(Pipelines, 0) for _, p := range ps { - if p.Broker() != broker { + if p.Driver() != broker { continue } @@ -98,9 +98,9 @@ func (p Pipeline) Name() string { return p.String("name", "") } -// Broker associated with the pipeline. -func (p Pipeline) Broker() string { - return p.String("broker", "") +// Driver associated with the pipeline. +func (p Pipeline) Driver() string { + return p.String("driver", "") } // Has checks if value presented in pipeline. diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go index f03dcbb8..77acf96e 100644 --- a/plugins/jobs/pipeline/pipeline_test.go +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -58,23 +58,23 @@ func TestPipeline_Has(t *testing.T) { func TestPipeline_FilterBroker(t *testing.T) { pipes := Pipelines{ - &Pipeline{"name": "first", "broker": "a"}, - &Pipeline{"name": "second", "broker": "a"}, - &Pipeline{"name": "third", "broker": "b"}, - &Pipeline{"name": "forth", "broker": "b"}, + &Pipeline{"name": "first", "driver": "a"}, + &Pipeline{"name": "second", "driver": "a"}, + &Pipeline{"name": "third", "driver": "b"}, + &Pipeline{"name": "forth", "driver": "b"}, } filtered := pipes.Names("first", "third") assert.True(t, len(filtered) == 2) - assert.Equal(t, "a", filtered[0].Broker()) - assert.Equal(t, "b", filtered[1].Broker()) + assert.Equal(t, "a", filtered[0].Driver()) + assert.Equal(t, "b", filtered[1].Driver()) filtered = pipes.Names("first", "third").Reverse() assert.True(t, len(filtered) == 2) - assert.Equal(t, "a", filtered[1].Broker()) - assert.Equal(t, "b", filtered[0].Broker()) + assert.Equal(t, "a", filtered[1].Driver()) + assert.Equal(t, "b", filtered[0].Driver()) filtered = pipes.Broker("a") assert.True(t, len(filtered) == 2) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6bf43a11..67077920 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,13 +2,14 @@ package jobs import ( "context" - "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -23,7 +24,7 @@ const ( ) type Plugin struct { - cfg *Config + cfg *Config `mapstructure:"jobs"` log logger.Logger workersPool pool.Pool @@ -41,10 +42,6 @@ type Plugin struct { pipelines pipeline.Pipelines } -func testListener(data interface{}) { - fmt.Println(data) -} - func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { @@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.events.AddListener(testListener) p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) @@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap() + p.queue = priorityqueue2.NewPriorityQueue() p.log = log return nil @@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + const op = errors.Op("jobs_plugin_serve") for name := range p.brokers { jb, err := p.brokers[name].InitJobBroker(p.queue) @@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error { p.consumers[name] = jb } + // register initial pipelines + for i := 0; i < len(p.pipelines); i++ { + pipe := p.pipelines[i] + + if jb, ok := p.consumers[pipe.Driver()]; ok { + err := jb.Register(pipe.Name()) + if err != nil { + errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) + return errCh + } + } + } + var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) if err != nil { errCh <- err return errCh } - // initialize sub-plugins - // provide a queue to them - // start consume loop - // start resp loop + // start listening + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } - /* - go func() { - for { - // get data JOB from the queue - job := p.queue.Pop() + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } - // request - _ = job - p.workersPool.Exec(nil) + _, err = p.workersPool.Exec(exec) + if err != nil { + panic(err) } - }() + } + }() - */ return errCh } @@ -141,18 +155,16 @@ func (p *Plugin) Name() string { func (p *Plugin) Push(j *structs.Job) (string, error) { pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Broker()] + broker, ok := p.consumers[pipe.Driver()] if !ok { panic("broker not found") } - id, err := broker.Push(pipe, j) + id, err := broker.Push(j) if err != nil { panic(err) } - // p.events.Push() - return id, nil } diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go index 2e394543..268444db 100644 --- a/plugins/jobs/structs/job.go +++ b/plugins/jobs/structs/job.go @@ -17,19 +17,35 @@ type Job struct { Options *Options `json:"options,omitempty"` } +func (j *Job) ID() string { + return j.Options.ID +} + +func (j *Job) Priority() uint64 { + return *j.Options.Priority +} + // Body packs job payload into binary payload. func (j *Job) Body() []byte { return utils.AsBytes(j.Payload) } // Context packs job context (job, id) into binary payload. -func (j *Job) Context(id string) []byte { +func (j *Job) Context() []byte { ctx, _ := json.Marshal( struct { ID string `json:"id"` Job string `json:"job"` - }{ID: id, Job: j.Job}, + }{ID: j.Options.ID, Job: j.Job}, ) return ctx } + +func (j *Job) Ack() { + +} + +func (j *Job) Nack() { + +} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 1507d053..029a797d 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -4,6 +4,13 @@ import "time" // Options carry information about how to handle given job. type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority *uint64 `json:"priority"` + + // ID - generated ID for the job + ID string `json:"id"` + // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go index e7240c6b..92f78081 100644 --- a/plugins/jobs/structs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -13,7 +13,7 @@ func TestJob_Body(t *testing.T) { } func TestJob_Context(t *testing.T) { - j := &Job{Job: "job"} + j := &Job{Job: "job", Options: &Options{ID: "id"}} - assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) } diff --git a/tests/plugins/http/configs/.rr-http.yaml b/tests/plugins/http/configs/.rr-http.yaml index c95bc049..b4910160 100644 --- a/tests/plugins/http/configs/.rr-http.yaml +++ b/tests/plugins/http/configs/.rr-http.yaml @@ -3,10 +3,6 @@ rpc: server: command: "php ../../http/client.php echo pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index 320f41b1..d86a8ad8 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -1,8 +1,8 @@ rpc: - listen: tcp://127.0.0.1:6001 + listen: unix:///home/valery/Downloads/rr.sock server: - command: "php ../../psr-worker-bench.php" + command: "php ../../client.php echo pipes" relay: "pipes" relay_timeout: "20s" @@ -24,7 +24,10 @@ sqs: jobs: # worker pool configuration pool: - num_workers: 4 + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s # list of broker pipelines associated with endpoints pipelines: diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index e8b4e83d..754f60bc 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -82,7 +82,7 @@ func TestJobsInit(t *testing.T) { } }() - time.Sleep(time.Second * 1) + time.Sleep(time.Second * 60) stopCh <- struct{}{} |