diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
commit | 60c229c8506df465586434309af5acd1f84e2406 (patch) | |
tree | 18fdf380b7e032415d656e84bcc3c7a057f194a8 /plugins | |
parent | 127186a72d4b8d30f6ada72ade661d8713490728 (diff) |
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines
handling mechanism.
Add delayed jobs for the ephemeral plugin.
Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with
a slice of the pipelines.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go (renamed from plugins/jobs/brokers/ephemeral/broker.go) | 57 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 125 | ||||
-rw-r--r-- | plugins/jobs/config.go | 11 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 148 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go | 69 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 161 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 97 | ||||
-rw-r--r-- | plugins/jobs/structs/general.go | 19 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go | 51 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go | 5 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go | 20 |
14 files changed, 376 insertions, 390 deletions
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/config.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/consumer.go index 6c7108f6..e31e3b25 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -2,13 +2,12 @@ package ephemeral import ( "sync" + "time" - "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" - "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { @@ -25,39 +24,47 @@ func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (*string, error) { +func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered if b, ok := j.queues.Load(job.Options.Pipeline); ok { if !b.(bool) { - return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } - if job.Options.Priority == nil { - job.Options.Priority = utils.AsUint64Ptr(10) + + // handle timeouts + if job.Options.Timeout > 0 { + go func(jj *structs.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // send the item after timeout expired + j.pq.Insert(From(job)) + }(job) + + return nil } - job.Options.ID = utils.AsStringPtr(uuid.NewString()) - j.pq.Insert(job) + j.pq.Insert(From(job)) - return job.Options.ID, nil + return nil } - return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } -func (j *JobBroker) Register(pipeline string) error { +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues.Load(pipeline); ok { + if _, ok := j.queues.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues.Store(pipeline, true) + j.queues.Store(pipeline.Name(), true) return nil } -func (j *JobBroker) Stop(pipeline string) { +func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -66,13 +73,6 @@ func (j *JobBroker) Stop(pipeline string) { } } -func (j *JobBroker) StopAll() { - j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, false) - return true - }) -} - func (j *JobBroker) Resume(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == false { @@ -82,17 +82,14 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) ResumeAll() { +func (j *JobBroker) List() []*pipeline.Pipeline { + out := make([]*pipeline.Pipeline, 0, 2) + j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, true) + pipe := key.(*pipeline.Pipeline) + out = append(out, pipe) return true }) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return out } diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go new file mode 100644 index 00000000..e2caa53a --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -0,0 +1,125 @@ +package ephemeral + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" +) + +func From(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: conv(*job.Options), + } +} + +func conv(jo structs.Options) Options { + return Options(jo) +} + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority uint64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay uint64 `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts uint64 `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay uint64 `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout uint64 `json:"timeout,omitempty"` +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt uint64) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() uint64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (j *Item) Ack() { + // noop for the in-memory +} + +func (j *Item) Nack() { + // noop for the in-memory +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 07e2ef38..aa2da2dc 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -7,12 +7,18 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" ) +const ( + // name used to set pipeline name + pipelineName string = "name" +) + // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { // NumPollers configures number of priority queue pollers // Should be no more than 255 // Default - num logical cores NumPollers uint8 `mapstructure:"num_pollers"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` @@ -32,5 +38,10 @@ func (c *Config) InitDefaults() { c.NumPollers = uint8(runtime.NumCPU()) } + for k := range c.Pipelines { + // set the pipeline name + c.Pipelines[k].With(pipelineName, k) + } + c.Pool.InitDefaults() } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 987f6826..e87204f9 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -1,106 +1,27 @@ package pipeline -import ( - "time" - - "github.com/spiral/errors" -) - -// Pipelines is list of Pipeline. - -type Pipelines []*Pipeline - -func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { - const op = errors.Op("pipeline_init") - out := make(Pipelines, 0) - - for name, pipe := range pipes { - if pipe.Driver() == "" { - return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) - } - - p := pipe.With("name", name) - out = append(out, &p) - } - - return out, nil -} - -// Reverse returns pipelines in reversed order. -func (ps Pipelines) Reverse() Pipelines { - out := make(Pipelines, len(ps)) - - for i, p := range ps { - out[len(ps)-i-1] = p - } - - return out -} - -// Broker return pipelines associated with specific broker. -func (ps Pipelines) Broker(broker string) Pipelines { - out := make(Pipelines, 0) - - for _, p := range ps { - if p.Driver() != broker { - continue - } - - out = append(out, p) - } - - return out -} - -// Names returns only pipelines with specified names. -func (ps Pipelines) Names(only ...string) Pipelines { - out := make(Pipelines, 0) - - for _, name := range only { - for _, p := range ps { - if p.Name() == name { - out = append(out, p) - } - } - } - - return out -} - -// Get returns pipeline by it'svc name. -func (ps Pipelines) Get(name string) *Pipeline { - // possibly optimize - for _, p := range ps { - if p.Name() == name { - return p - } - } - - return nil -} - // Pipeline defines pipeline options. type Pipeline map[string]interface{} -// With pipeline value. Immutable. -func (p Pipeline) With(name string, value interface{}) Pipeline { - out := make(map[string]interface{}) - for k, v := range p { - out[k] = v - } - out[name] = value +const ( + priority string = "priority" + driver string = "driver" + name string = "name" +) - return out +// With pipeline value +func (p *Pipeline) With(name string, value interface{}) { + (*p)[name] = value } // Name returns pipeline name. func (p Pipeline) Name() string { - return p.String("name", "") + return p.String(name, "") } // Driver associated with the pipeline. func (p Pipeline) Driver() string { - return p.String("driver", "") + return p.String(driver, "") } // Has checks if value presented in pipeline. @@ -112,32 +33,6 @@ func (p Pipeline) Has(name string) bool { return false } -// Map must return nested map value or empty config. -func (p Pipeline) Map(name string) Pipeline { - out := make(map[string]interface{}) - - if value, ok := p[name]; ok { - if m, ok := value.(map[string]interface{}); ok { - for k, v := range m { - out[k] = v - } - } - } - - return out -} - -// Bool must return option value as string or return default value. -func (p Pipeline) Bool(name string, d bool) bool { - if value, ok := p[name]; ok { - if b, ok := value.(bool); ok { - return b - } - } - - return d -} - // String must return option value as string or return default value. func (p Pipeline) String(name string, d string) string { if value, ok := p[name]; ok { @@ -149,24 +44,13 @@ func (p Pipeline) String(name string, d string) string { return d } -// Integer must return option value as string or return default value. -func (p Pipeline) Integer(name string, d int) int { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return str +// Priority returns default pipeline priority +func (p Pipeline) Priority() uint64 { + if value, ok := p[priority]; ok { + if v, ok := value.(uint64); ok { + return v } } - return d -} - -// Duration must return option value as time.Duration (seconds) or return default value. -func (p Pipeline) Duration(name string, d time.Duration) time.Duration { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return time.Second * time.Duration(str) - } - } - - return d + return 10 } diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go index 77acf96e..4482c70d 100644 --- a/plugins/jobs/pipeline/pipeline_test.go +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -2,32 +2,10 @@ package pipeline import ( "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestPipeline_Map(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} - - assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) - assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) -} - -func TestPipeline_MapString(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} - - assert.Equal(t, "default", pipe.Map("options").String("alias", "")) - assert.Equal(t, "", pipe.Map("other").String("alias", "")) -} - -func TestPipeline_Bool(t *testing.T) { - pipe := Pipeline{"value": true} - - assert.Equal(t, true, pipe.Bool("value", false)) - assert.Equal(t, true, pipe.Bool("other", true)) -} - func TestPipeline_String(t *testing.T) { pipe := Pipeline{"value": "value"} @@ -35,56 +13,9 @@ func TestPipeline_String(t *testing.T) { assert.Equal(t, "value", pipe.String("other", "value")) } -func TestPipeline_Integer(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, 1, pipe.Integer("value", 0)) - assert.Equal(t, 1, pipe.Integer("other", 1)) -} - -func TestPipeline_Duration(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, time.Second, pipe.Duration("value", 0)) - assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) -} - func TestPipeline_Has(t *testing.T) { pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} assert.Equal(t, true, pipe.Has("options")) assert.Equal(t, false, pipe.Has("other")) } - -func TestPipeline_FilterBroker(t *testing.T) { - pipes := Pipelines{ - &Pipeline{"name": "first", "driver": "a"}, - &Pipeline{"name": "second", "driver": "a"}, - &Pipeline{"name": "third", "driver": "b"}, - &Pipeline{"name": "forth", "driver": "b"}, - } - - filtered := pipes.Names("first", "third") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[0].Driver()) - assert.Equal(t, "b", filtered[1].Driver()) - - filtered = pipes.Names("first", "third").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[1].Driver()) - assert.Equal(t, "b", filtered[0].Driver()) - - filtered = pipes.Broker("a") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[0].Name()) - assert.Equal(t, "second", filtered[1].Name()) - - filtered = pipes.Broker("a").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[1].Name()) - assert.Equal(t, "second", filtered[0].Name()) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c3f766b9..d603dce6 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" @@ -16,13 +17,14 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" ) const ( // RrJobs env variable RrJobs string = "rr_jobs" PluginName string = "jobs" + + pipelines string = "pipelines" ) type Plugin struct { @@ -42,8 +44,8 @@ type Plugin struct { // priority queue implementation queue priorityqueue.Queue - // parent config for broken options. - pipelines pipeline.Pipelines + // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline + pipelines sync.Map } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,9 +67,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines - p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) - if err != nil { - return errors.E(op, err) + for i := range p.cfg.Pipelines { + p.pipelines.Store(i, p.cfg.Pipelines[i]) } // initialize priority queue @@ -81,28 +82,42 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.jobConstructors { - jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) - if err != nil { - errCh <- err - return errCh - } - - p.consumers[name] = jb - } - // register initial pipelines - for i := 0; i < len(p.pipelines); i++ { - pipe := p.pipelines[i] + p.pipelines.Range(func(key, value interface{}) bool { + // pipeline name (ie test-local, sqs-aws, etc) + name := key.(string) + + // pipeline associated with the name + pipe := value.(*pipeline.Pipeline) + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipe.Driver() + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // config key for the particular sub-driver jobs.pipelines.test-local + configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) + + // init the driver + initializedDriver, err := c.JobsConstruct(configKey, p.queue) + if err != nil { + errCh <- errors.E(op, err) + return false + } - if jb, ok := p.consumers[pipe.Driver()]; ok { - err := jb.Register(pipe.Name()) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[name] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(pipe) if err != nil { errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) - return errCh + return false } } - } + + return true + }) var err error p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) @@ -119,12 +134,18 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() + ctx, err := job.Context() + if err != nil { + job.Nack() + p.log.Error("job marshal context", "error", err) + } + exec := payload.Payload{ - Context: job.Context(), + Context: ctx, Body: job.Body(), } - _, err := p.workersPool.Exec(exec) + _, err = p.workersPool.Exec(exec) if err != nil { job.Nack() p.log.Error("job execute", "error", err) @@ -160,41 +181,105 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (*string, error) { +func (p *Plugin) Push(j *structs.Job) error { const op = errors.Op("jobs_plugin_push") - pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) } - id, err := broker.Push(j) + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j.Options.Priority == 0 { + j.Options.Priority = ppl.Priority() + } + + err := d.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - return id, nil + return nil } -func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { +func (p *Plugin) PushBatch(j []*structs.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { - pipe := p.pipelines.Get(j[i].Options.Pipeline) + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) + } + + ppl := pipe.(*pipeline.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + d, ok := p.consumers[ppl.Name()] if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j[i].Options.Priority == 0 { + j[i].Options.Priority = ppl.Priority() } - _, err := broker.Push(j[i]) + err := d.Push(j[i]) if err != nil { - return nil, errors.E(op, err) + return errors.E(op, err) } } - return utils.AsStringPtr("test"), nil + return nil +} + +func (p *Plugin) Pause(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Pause(ppl.Name()) + } +} + +func (p *Plugin) Resume(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) + } } func (p *Plugin) RPC() interface{} { diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0d4cc099..6718b99a 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -19,45 +19,32 @@ List of the RPC methods: 3. Reset - managed by the Resetter plugin -4. Stop - stop pipeline processing -5. StopAll - stop all pipelines processing -6. Resume - resume pipeline processing -7. ResumeAll - resume stopped pipelines +4. Pause - pauses set of pipelines +5. Resume - resumes set of pipelines -8. Workers - managed by the Informer plugin. -9. Stat - jobs statistic +6. Workers - managed by the Informer plugin. +7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJob().Job, - Payload: j.GetJob().Payload, - Options: &structs.Options{ - Priority: &j.GetJob().Options.Priority, - ID: &j.GetJob().Options.Id, - Pipeline: j.GetJob().Options.Pipeline, - Delay: j.GetJob().Options.Delay, - Attempts: j.GetJob().Options.Attempts, - RetryDelay: j.GetJob().Options.RetryDelay, - Timeout: j.GetJob().Options.Timeout, - }, + + if j.GetJob().GetId() == "" { + return errors.E(op, errors.Str("empty ID field not allowed")) } - id, err := r.p.Push(jb) + err := r.p.Push(r.from(j.GetJob())) if err != nil { return errors.E(op, err) } - resp.Id = *id - return nil } -func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -67,24 +54,10 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJobs()[i].Job, - Payload: j.GetJobs()[i].Payload, - Options: &structs.Options{ - Priority: &j.GetJobs()[i].Options.Priority, - ID: &j.GetJobs()[i].Options.Id, - Pipeline: j.GetJobs()[i].Options.Pipeline, - Delay: j.GetJobs()[i].Options.Delay, - Attempts: j.GetJobs()[i].Options.Attempts, - RetryDelay: j.GetJobs()[i].Options.RetryDelay, - Timeout: j.GetJobs()[i].Options.Timeout, - }, - } - - batch[i] = jb + batch[i] = r.from(j.GetJobs()[i]) } - _, err := r.p.PushBatch(batch) + err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } @@ -92,18 +65,50 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e return nil } -func (r *rpc) Stop(pipeline string, w *string) error { - return nil -} +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } -func (r *rpc) StopAll(_ bool, w *string) error { + r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(pipeline string, w *string) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } + + r.p.Resume(pipelines) return nil } -func (r *rpc) ResumeAll(_ bool, w *string) error { - return nil +// from converts from transport entity to domain +func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { + headers := map[string][]string{} + + for k, v := range j.GetHeaders() { + headers[k] = v.GetValue() + } + + jb := &structs.Job{ + Job: j.GetJob(), + Headers: headers, + Ident: j.GetId(), + Payload: j.GetPayload(), + Options: &structs.Options{ + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), + Attempts: j.GetOptions().GetAttempts(), + RetryDelay: j.GetOptions().GetRetryDelay(), + Timeout: j.GetOptions().GetTimeout(), + }, + } + + return jb } diff --git a/plugins/jobs/structs/general.go b/plugins/jobs/structs/general.go new file mode 100644 index 00000000..ae754286 --- /dev/null +++ b/plugins/jobs/structs/general.go @@ -0,0 +1,19 @@ +package structs + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-value pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go deleted file mode 100644 index 1ef4d2ca..00000000 --- a/plugins/jobs/structs/job.go +++ /dev/null @@ -1,51 +0,0 @@ -package structs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/utils" -) - -// Job carries information about single job. -type Job struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -func (j *Job) ID() *string { - return j.Options.ID -} - -func (j *Job) Priority() *uint64 { - return j.Options.Priority -} - -// Body packs job payload into binary payload. -func (j *Job) Body() []byte { - return utils.AsBytes(j.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (j *Job) Context() []byte { - ctx, _ := json.Marshal( - struct { - ID *string `json:"id"` - Job string `json:"job"` - }{ID: j.Options.ID, Job: j.Job}, - ) - - return ctx -} - -func (j *Job) Ack() { - -} - -func (j *Job) Nack() { - -} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 3e1ada85..d48e2c56 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -6,10 +6,7 @@ import "time" type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set - Priority *uint64 `json:"priority"` - - // ID - generated ID for the job - ID *string `json:"id"` + Priority uint64 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go deleted file mode 100644 index 0aa5b177..00000000 --- a/plugins/jobs/structs/job_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package structs - -import ( - "testing" - - "github.com/spiral/roadrunner/v2/utils" - "github.com/stretchr/testify/assert" -) - -func TestJob_Body(t *testing.T) { - j := &Job{Payload: "hello"} - - assert.Equal(t, []byte("hello"), j.Body()) -} - -func TestJob_Context(t *testing.T) { - j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}} - - assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) -} |