diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 50 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/entry.go | 26 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/queue.go | 7 | ||||
-rw-r--r-- | plugins/jobs/config.go | 12 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 8 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 10 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go | 16 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 68 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go | 20 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go | 7 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go | 4 |
13 files changed, 119 insertions, 112 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 4d357c34..3eb20c27 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -3,60 +3,68 @@ package ephemeral import ( "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { - jobs chan *entry - queues map[*pipeline.Pipeline]*queue + queues map[string]bool pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - jobs: make(chan *entry, 10), - pq: q, + queues: make(map[string]bool), + pq: q, } - go jb.serve() - return jb, nil } -func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { - id := uuid.NewString() +func (j *JobBroker) Push(job *structs.Job) (string, error) { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.queues[job.Options.Pipeline]; ok { + if !b { + return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + } + if job.Options.Priority == nil { + job.Options.Priority = intPtr(10) + } + job.Options.ID = uuid.NewString() + + j.pq.Insert(job) - j.jobs <- &entry{ - id: id, + return job.Options.ID, nil } - return id, nil + return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Stat() { panic("implement me") } -func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { +func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; !ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + if _, ok := j.queues[pipeline]; ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = newQueue() + j.queues[pipeline] = true return nil } -func (j *JobBroker) serve() { - for item := range j.jobs { - // item should satisfy - j.pq.Insert(item) +func intPtr(val uint64) *uint64 { + if val == 0 { + val = 10 } + return &val } diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go deleted file mode 100644 index 847b63ea..00000000 --- a/plugins/jobs/brokers/ephemeral/config.go +++ /dev/null @@ -1 +0,0 @@ -package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go deleted file mode 100644 index 3cedec3e..00000000 --- a/plugins/jobs/brokers/ephemeral/entry.go +++ /dev/null @@ -1,26 +0,0 @@ -package ephemeral - -type entry struct { - id string - priority uint64 -} - -func (e *entry) ID() string { - return e.id -} - -func (e *entry) Priority() uint64 { - return e.priority -} - -func (e *entry) Ask() { - // no-op -} - -func (e *entry) Nack() { - // no-op -} - -func (e *entry) Payload() []byte { - panic("implement me") -} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 84cc871b..146d1fdc 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,7 +1,7 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go deleted file mode 100644 index 1c6d865b..00000000 --- a/plugins/jobs/brokers/ephemeral/queue.go +++ /dev/null @@ -1,7 +0,0 @@ -package ephemeral - -type queue struct{} - -func newQueue() *queue { - return &queue{} -} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 87e36ecb..1cb2c2a2 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -9,19 +9,19 @@ import ( type Config struct { // Workers configures roadrunner server and worker busy. // Workers *roadrunner.ServerConfig - poolCfg *poolImpl.Config + Pool *poolImpl.Config `mapstructure:"Pool"` // Pipelines defines mapping between PHP job pipeline and associated job broker. - Pipelines map[string]*pipeline.Pipeline + Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"` // Consuming specifies names of pipelines to be consumed on service start. - Consume []string + Consume []string `mapstructure:"consume"` } func (c *Config) InitDefaults() { - if c.poolCfg == nil { - c.poolCfg = &poolImpl.Config{} + if c.Pool == nil { + c.Pool = &poolImpl.Config{} } - c.poolCfg.InitDefaults() + c.Pool.InitDefaults() } diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index a0aed50b..a2cf6ed9 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -1,17 +1,17 @@ package jobs import ( - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) // Consumer todo naming type Consumer interface { - Push(*pipeline.Pipeline, *structs.Job) (string, error) + Push(*structs.Job) (string, error) Stat() Consume(*pipeline.Pipeline) - Register(*pipeline.Pipeline) error + Register(pipe string) error } type Broker interface { @@ -20,7 +20,5 @@ type Broker interface { type Item interface { ID() string - Ask() - Nack() Payload() []byte } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index f27f6ede..987f6826 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -15,7 +15,7 @@ func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { out := make(Pipelines, 0) for name, pipe := range pipes { - if pipe.Broker() == "" { + if pipe.Driver() == "" { return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) } @@ -42,7 +42,7 @@ func (ps Pipelines) Broker(broker string) Pipelines { out := make(Pipelines, 0) for _, p := range ps { - if p.Broker() != broker { + if p.Driver() != broker { continue } @@ -98,9 +98,9 @@ func (p Pipeline) Name() string { return p.String("name", "") } -// Broker associated with the pipeline. -func (p Pipeline) Broker() string { - return p.String("broker", "") +// Driver associated with the pipeline. +func (p Pipeline) Driver() string { + return p.String("driver", "") } // Has checks if value presented in pipeline. diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go index f03dcbb8..77acf96e 100644 --- a/plugins/jobs/pipeline/pipeline_test.go +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -58,23 +58,23 @@ func TestPipeline_Has(t *testing.T) { func TestPipeline_FilterBroker(t *testing.T) { pipes := Pipelines{ - &Pipeline{"name": "first", "broker": "a"}, - &Pipeline{"name": "second", "broker": "a"}, - &Pipeline{"name": "third", "broker": "b"}, - &Pipeline{"name": "forth", "broker": "b"}, + &Pipeline{"name": "first", "driver": "a"}, + &Pipeline{"name": "second", "driver": "a"}, + &Pipeline{"name": "third", "driver": "b"}, + &Pipeline{"name": "forth", "driver": "b"}, } filtered := pipes.Names("first", "third") assert.True(t, len(filtered) == 2) - assert.Equal(t, "a", filtered[0].Broker()) - assert.Equal(t, "b", filtered[1].Broker()) + assert.Equal(t, "a", filtered[0].Driver()) + assert.Equal(t, "b", filtered[1].Driver()) filtered = pipes.Names("first", "third").Reverse() assert.True(t, len(filtered) == 2) - assert.Equal(t, "a", filtered[1].Broker()) - assert.Equal(t, "b", filtered[0].Broker()) + assert.Equal(t, "a", filtered[1].Driver()) + assert.Equal(t, "b", filtered[0].Driver()) filtered = pipes.Broker("a") assert.True(t, len(filtered) == 2) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6bf43a11..67077920 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,13 +2,14 @@ package jobs import ( "context" - "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -23,7 +24,7 @@ const ( ) type Plugin struct { - cfg *Config + cfg *Config `mapstructure:"jobs"` log logger.Logger workersPool pool.Pool @@ -41,10 +42,6 @@ type Plugin struct { pipelines pipeline.Pipelines } -func testListener(data interface{}) { - fmt.Println(data) -} - func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { @@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.events.AddListener(testListener) p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) @@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap() + p.queue = priorityqueue2.NewPriorityQueue() p.log = log return nil @@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + const op = errors.Op("jobs_plugin_serve") for name := range p.brokers { jb, err := p.brokers[name].InitJobBroker(p.queue) @@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error { p.consumers[name] = jb } + // register initial pipelines + for i := 0; i < len(p.pipelines); i++ { + pipe := p.pipelines[i] + + if jb, ok := p.consumers[pipe.Driver()]; ok { + err := jb.Register(pipe.Name()) + if err != nil { + errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) + return errCh + } + } + } + var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) if err != nil { errCh <- err return errCh } - // initialize sub-plugins - // provide a queue to them - // start consume loop - // start resp loop + // start listening + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } - /* - go func() { - for { - // get data JOB from the queue - job := p.queue.Pop() + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } - // request - _ = job - p.workersPool.Exec(nil) + _, err = p.workersPool.Exec(exec) + if err != nil { + panic(err) } - }() + } + }() - */ return errCh } @@ -141,18 +155,16 @@ func (p *Plugin) Name() string { func (p *Plugin) Push(j *structs.Job) (string, error) { pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Broker()] + broker, ok := p.consumers[pipe.Driver()] if !ok { panic("broker not found") } - id, err := broker.Push(pipe, j) + id, err := broker.Push(j) if err != nil { panic(err) } - // p.events.Push() - return id, nil } diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go index 2e394543..268444db 100644 --- a/plugins/jobs/structs/job.go +++ b/plugins/jobs/structs/job.go @@ -17,19 +17,35 @@ type Job struct { Options *Options `json:"options,omitempty"` } +func (j *Job) ID() string { + return j.Options.ID +} + +func (j *Job) Priority() uint64 { + return *j.Options.Priority +} + // Body packs job payload into binary payload. func (j *Job) Body() []byte { return utils.AsBytes(j.Payload) } // Context packs job context (job, id) into binary payload. -func (j *Job) Context(id string) []byte { +func (j *Job) Context() []byte { ctx, _ := json.Marshal( struct { ID string `json:"id"` Job string `json:"job"` - }{ID: id, Job: j.Job}, + }{ID: j.Options.ID, Job: j.Job}, ) return ctx } + +func (j *Job) Ack() { + +} + +func (j *Job) Nack() { + +} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 1507d053..029a797d 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -4,6 +4,13 @@ import "time" // Options carry information about how to handle given job. type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority *uint64 `json:"priority"` + + // ID - generated ID for the job + ID string `json:"id"` + // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go index e7240c6b..92f78081 100644 --- a/plugins/jobs/structs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -13,7 +13,7 @@ func TestJob_Body(t *testing.T) { } func TestJob_Context(t *testing.T) { - j := &Job{Job: "job"} + j := &Job{Job: "job", Options: &Options{ID: "id"}} - assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) } |