diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
commit | 60c229c8506df465586434309af5acd1f84e2406 (patch) | |
tree | 18fdf380b7e032415d656e84bcc3c7a057f194a8 | |
parent | 127186a72d4b8d30f6ada72ade661d8713490728 (diff) |
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines
handling mechanism.
Add delayed jobs for the ephemeral plugin.
Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with
a slice of the pipelines.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | common/jobs/interface.go | 12 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 6 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 13 | ||||
-rw-r--r-- | pkg/priorityqueue/interface.go | 18 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go (renamed from plugins/jobs/brokers/ephemeral/broker.go) | 57 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 125 | ||||
-rw-r--r-- | plugins/jobs/config.go | 11 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 148 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go | 69 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 161 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 97 | ||||
-rw-r--r-- | plugins/jobs/structs/general.go | 19 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go | 51 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go | 5 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go | 20 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.pb.go | 339 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 34 |
20 files changed, 663 insertions, 525 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go index 786eca0e..deb90cde 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -8,16 +8,12 @@ import ( // Consumer todo naming type Consumer interface { - Push(job *structs.Job) (*string, error) - Consume(job *pipeline.Pipeline) + Push(job *structs.Job) error + Register(pipeline *pipeline.Pipeline) error + List() []*pipeline.Pipeline - Stop(pipeline string) - StopAll() + Pause(pipeline string) Resume(pipeline string) - ResumeAll() - - Register(pipe string) error - Stat() } type Constructor interface { diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index f3d8f95b..47fdf5e5 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -31,7 +31,7 @@ func (bh *BinHeap) fixUp() { for k > 0 { cur, par := (bh.items)[k], (bh.items)[p] - if *cur.Priority() < *par.Priority() { + if cur.Priority() < par.Priority() { bh.swap(k, p) k = p p = (k - 1) >> 1 @@ -55,10 +55,10 @@ func (bh *BinHeap) fixDown(curr, end int) { idxToSwap := cOneIdx // oh my, so unsafe - if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() { + if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { idxToSwap = cTwoIdx } - if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { + if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { bh.swap(uint64(curr), uint64(idxToSwap)) curr = idxToSwap cOneIdx = (curr << 1) + 1 diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 4c234dc5..b02017b6 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/require" ) @@ -23,16 +22,16 @@ func (t Test) Body() []byte { return nil } -func (t Test) Context() []byte { - return nil +func (t Test) Context() ([]byte, error) { + return nil, nil } -func (t Test) ID() *string { - return utils.AsStringPtr("none") +func (t Test) ID() string { + return "none" } -func (t Test) Priority() *uint64 { - return utils.AsUint64Ptr(uint64(t)) +func (t Test) Priority() uint64 { + return uint64(t) } func TestBinHeap_Init(t *testing.T) { diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go index 7ac2e449..100aa667 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -5,11 +5,23 @@ type Queue interface { GetMax() Item } +// Item represents binary heap item type Item interface { - ID() *string - Priority() *uint64 + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() uint64 + + // Body is the Item payload Body() []byte - Context() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing Ack() + + // Nack - discard the Item Nack() } diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/config.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/consumer.go index 6c7108f6..e31e3b25 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -2,13 +2,12 @@ package ephemeral import ( "sync" + "time" - "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" - "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { @@ -25,39 +24,47 @@ func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (*string, error) { +func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered if b, ok := j.queues.Load(job.Options.Pipeline); ok { if !b.(bool) { - return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } - if job.Options.Priority == nil { - job.Options.Priority = utils.AsUint64Ptr(10) + + // handle timeouts + if job.Options.Timeout > 0 { + go func(jj *structs.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // send the item after timeout expired + j.pq.Insert(From(job)) + }(job) + + return nil } - job.Options.ID = utils.AsStringPtr(uuid.NewString()) - j.pq.Insert(job) + j.pq.Insert(From(job)) - return job.Options.ID, nil + return nil } - return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } -func (j *JobBroker) Register(pipeline string) error { +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues.Load(pipeline); ok { + if _, ok := j.queues.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues.Store(pipeline, true) + j.queues.Store(pipeline.Name(), true) return nil } -func (j *JobBroker) Stop(pipeline string) { +func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -66,13 +73,6 @@ func (j *JobBroker) Stop(pipeline string) { } } -func (j *JobBroker) StopAll() { - j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, false) - return true - }) -} - func (j *JobBroker) Resume(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == false { @@ -82,17 +82,14 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) ResumeAll() { +func (j *JobBroker) List() []*pipeline.Pipeline { + out := make([]*pipeline.Pipeline, 0, 2) + j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, true) + pipe := key.(*pipeline.Pipeline) + out = append(out, pipe) return true }) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return out } diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go new file mode 100644 index 00000000..e2caa53a --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -0,0 +1,125 @@ +package ephemeral + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" +) + +func From(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: conv(*job.Options), + } +} + +func conv(jo structs.Options) Options { + return Options(jo) +} + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority uint64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay uint64 `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts uint64 `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay uint64 `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout uint64 `json:"timeout,omitempty"` +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt uint64) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() uint64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (j *Item) Ack() { + // noop for the in-memory +} + +func (j *Item) Nack() { + // noop for the in-memory +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 07e2ef38..aa2da2dc 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -7,12 +7,18 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" ) +const ( + // name used to set pipeline name + pipelineName string = "name" +) + // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { // NumPollers configures number of priority queue pollers // Should be no more than 255 // Default - num logical cores NumPollers uint8 `mapstructure:"num_pollers"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` @@ -32,5 +38,10 @@ func (c *Config) InitDefaults() { c.NumPollers = uint8(runtime.NumCPU()) } + for k := range c.Pipelines { + // set the pipeline name + c.Pipelines[k].With(pipelineName, k) + } + c.Pool.InitDefaults() } diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go index 987f6826..e87204f9 100644 --- a/plugins/jobs/pipeline/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -1,106 +1,27 @@ package pipeline -import ( - "time" - - "github.com/spiral/errors" -) - -// Pipelines is list of Pipeline. - -type Pipelines []*Pipeline - -func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { - const op = errors.Op("pipeline_init") - out := make(Pipelines, 0) - - for name, pipe := range pipes { - if pipe.Driver() == "" { - return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) - } - - p := pipe.With("name", name) - out = append(out, &p) - } - - return out, nil -} - -// Reverse returns pipelines in reversed order. -func (ps Pipelines) Reverse() Pipelines { - out := make(Pipelines, len(ps)) - - for i, p := range ps { - out[len(ps)-i-1] = p - } - - return out -} - -// Broker return pipelines associated with specific broker. -func (ps Pipelines) Broker(broker string) Pipelines { - out := make(Pipelines, 0) - - for _, p := range ps { - if p.Driver() != broker { - continue - } - - out = append(out, p) - } - - return out -} - -// Names returns only pipelines with specified names. -func (ps Pipelines) Names(only ...string) Pipelines { - out := make(Pipelines, 0) - - for _, name := range only { - for _, p := range ps { - if p.Name() == name { - out = append(out, p) - } - } - } - - return out -} - -// Get returns pipeline by it'svc name. -func (ps Pipelines) Get(name string) *Pipeline { - // possibly optimize - for _, p := range ps { - if p.Name() == name { - return p - } - } - - return nil -} - // Pipeline defines pipeline options. type Pipeline map[string]interface{} -// With pipeline value. Immutable. -func (p Pipeline) With(name string, value interface{}) Pipeline { - out := make(map[string]interface{}) - for k, v := range p { - out[k] = v - } - out[name] = value +const ( + priority string = "priority" + driver string = "driver" + name string = "name" +) - return out +// With pipeline value +func (p *Pipeline) With(name string, value interface{}) { + (*p)[name] = value } // Name returns pipeline name. func (p Pipeline) Name() string { - return p.String("name", "") + return p.String(name, "") } // Driver associated with the pipeline. func (p Pipeline) Driver() string { - return p.String("driver", "") + return p.String(driver, "") } // Has checks if value presented in pipeline. @@ -112,32 +33,6 @@ func (p Pipeline) Has(name string) bool { return false } -// Map must return nested map value or empty config. -func (p Pipeline) Map(name string) Pipeline { - out := make(map[string]interface{}) - - if value, ok := p[name]; ok { - if m, ok := value.(map[string]interface{}); ok { - for k, v := range m { - out[k] = v - } - } - } - - return out -} - -// Bool must return option value as string or return default value. -func (p Pipeline) Bool(name string, d bool) bool { - if value, ok := p[name]; ok { - if b, ok := value.(bool); ok { - return b - } - } - - return d -} - // String must return option value as string or return default value. func (p Pipeline) String(name string, d string) string { if value, ok := p[name]; ok { @@ -149,24 +44,13 @@ func (p Pipeline) String(name string, d string) string { return d } -// Integer must return option value as string or return default value. -func (p Pipeline) Integer(name string, d int) int { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return str +// Priority returns default pipeline priority +func (p Pipeline) Priority() uint64 { + if value, ok := p[priority]; ok { + if v, ok := value.(uint64); ok { + return v } } - return d -} - -// Duration must return option value as time.Duration (seconds) or return default value. -func (p Pipeline) Duration(name string, d time.Duration) time.Duration { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return time.Second * time.Duration(str) - } - } - - return d + return 10 } diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go index 77acf96e..4482c70d 100644 --- a/plugins/jobs/pipeline/pipeline_test.go +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -2,32 +2,10 @@ package pipeline import ( "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestPipeline_Map(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} - - assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) - assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) -} - -func TestPipeline_MapString(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} - - assert.Equal(t, "default", pipe.Map("options").String("alias", "")) - assert.Equal(t, "", pipe.Map("other").String("alias", "")) -} - -func TestPipeline_Bool(t *testing.T) { - pipe := Pipeline{"value": true} - - assert.Equal(t, true, pipe.Bool("value", false)) - assert.Equal(t, true, pipe.Bool("other", true)) -} - func TestPipeline_String(t *testing.T) { pipe := Pipeline{"value": "value"} @@ -35,56 +13,9 @@ func TestPipeline_String(t *testing.T) { assert.Equal(t, "value", pipe.String("other", "value")) } -func TestPipeline_Integer(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, 1, pipe.Integer("value", 0)) - assert.Equal(t, 1, pipe.Integer("other", 1)) -} - -func TestPipeline_Duration(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, time.Second, pipe.Duration("value", 0)) - assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) -} - func TestPipeline_Has(t *testing.T) { pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} assert.Equal(t, true, pipe.Has("options")) assert.Equal(t, false, pipe.Has("other")) } - -func TestPipeline_FilterBroker(t *testing.T) { - pipes := Pipelines{ - &Pipeline{"name": "first", "driver": "a"}, - &Pipeline{"name": "second", "driver": "a"}, - &Pipeline{"name": "third", "driver": "b"}, - &Pipeline{"name": "forth", "driver": "b"}, - } - - filtered := pipes.Names("first", "third") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[0].Driver()) - assert.Equal(t, "b", filtered[1].Driver()) - - filtered = pipes.Names("first", "third").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[1].Driver()) - assert.Equal(t, "b", filtered[0].Driver()) - - filtered = pipes.Broker("a") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[0].Name()) - assert.Equal(t, "second", filtered[1].Name()) - - filtered = pipes.Broker("a").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[1].Name()) - assert.Equal(t, "second", filtered[0].Name()) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c3f766b9..d603dce6 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" @@ -16,13 +17,14 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" ) const ( // RrJobs env variable RrJobs string = "rr_jobs" PluginName string = "jobs" + + pipelines string = "pipelines" ) type Plugin struct { @@ -42,8 +44,8 @@ type Plugin struct { // priority queue implementation queue priorityqueue.Queue - // parent config for broken options. - pipelines pipeline.Pipelines + // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline + pipelines sync.Map } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,9 +67,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines - p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) - if err != nil { - return errors.E(op, err) + for i := range p.cfg.Pipelines { + p.pipelines.Store(i, p.cfg.Pipelines[i]) } // initialize priority queue @@ -81,28 +82,42 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.jobConstructors { - jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) - if err != nil { - errCh <- err - return errCh - } - - p.consumers[name] = jb - } - // register initial pipelines - for i := 0; i < len(p.pipelines); i++ { - pipe := p.pipelines[i] + p.pipelines.Range(func(key, value interface{}) bool { + // pipeline name (ie test-local, sqs-aws, etc) + name := key.(string) + + // pipeline associated with the name + pipe := value.(*pipeline.Pipeline) + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipe.Driver() + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // config key for the particular sub-driver jobs.pipelines.test-local + configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) + + // init the driver + initializedDriver, err := c.JobsConstruct(configKey, p.queue) + if err != nil { + errCh <- errors.E(op, err) + return false + } - if jb, ok := p.consumers[pipe.Driver()]; ok { - err := jb.Register(pipe.Name()) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[name] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(pipe) if err != nil { errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) - return errCh + return false } } - } + + return true + }) var err error p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) @@ -119,12 +134,18 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() + ctx, err := job.Context() + if err != nil { + job.Nack() + p.log.Error("job marshal context", "error", err) + } + exec := payload.Payload{ - Context: job.Context(), + Context: ctx, Body: job.Body(), } - _, err := p.workersPool.Exec(exec) + _, err = p.workersPool.Exec(exec) if err != nil { job.Nack() p.log.Error("job execute", "error", err) @@ -160,41 +181,105 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (*string, error) { +func (p *Plugin) Push(j *structs.Job) error { const op = errors.Op("jobs_plugin_push") - pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) } - id, err := broker.Push(j) + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j.Options.Priority == 0 { + j.Options.Priority = ppl.Priority() + } + + err := d.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - return id, nil + return nil } -func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { +func (p *Plugin) PushBatch(j []*structs.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { - pipe := p.pipelines.Get(j[i].Options.Pipeline) + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) + } + + ppl := pipe.(*pipeline.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + d, ok := p.consumers[ppl.Name()] if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j[i].Options.Priority == 0 { + j[i].Options.Priority = ppl.Priority() } - _, err := broker.Push(j[i]) + err := d.Push(j[i]) if err != nil { - return nil, errors.E(op, err) + return errors.E(op, err) } } - return utils.AsStringPtr("test"), nil + return nil +} + +func (p *Plugin) Pause(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Pause(ppl.Name()) + } +} + +func (p *Plugin) Resume(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) + } } func (p *Plugin) RPC() interface{} { diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0d4cc099..6718b99a 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -19,45 +19,32 @@ List of the RPC methods: 3. Reset - managed by the Resetter plugin -4. Stop - stop pipeline processing -5. StopAll - stop all pipelines processing -6. Resume - resume pipeline processing -7. ResumeAll - resume stopped pipelines +4. Pause - pauses set of pipelines +5. Resume - resumes set of pipelines -8. Workers - managed by the Informer plugin. -9. Stat - jobs statistic +6. Workers - managed by the Informer plugin. +7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJob().Job, - Payload: j.GetJob().Payload, - Options: &structs.Options{ - Priority: &j.GetJob().Options.Priority, - ID: &j.GetJob().Options.Id, - Pipeline: j.GetJob().Options.Pipeline, - Delay: j.GetJob().Options.Delay, - Attempts: j.GetJob().Options.Attempts, - RetryDelay: j.GetJob().Options.RetryDelay, - Timeout: j.GetJob().Options.Timeout, - }, + + if j.GetJob().GetId() == "" { + return errors.E(op, errors.Str("empty ID field not allowed")) } - id, err := r.p.Push(jb) + err := r.p.Push(r.from(j.GetJob())) if err != nil { return errors.E(op, err) } - resp.Id = *id - return nil } -func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -67,24 +54,10 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJobs()[i].Job, - Payload: j.GetJobs()[i].Payload, - Options: &structs.Options{ - Priority: &j.GetJobs()[i].Options.Priority, - ID: &j.GetJobs()[i].Options.Id, - Pipeline: j.GetJobs()[i].Options.Pipeline, - Delay: j.GetJobs()[i].Options.Delay, - Attempts: j.GetJobs()[i].Options.Attempts, - RetryDelay: j.GetJobs()[i].Options.RetryDelay, - Timeout: j.GetJobs()[i].Options.Timeout, - }, - } - - batch[i] = jb + batch[i] = r.from(j.GetJobs()[i]) } - _, err := r.p.PushBatch(batch) + err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } @@ -92,18 +65,50 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e return nil } -func (r *rpc) Stop(pipeline string, w *string) error { - return nil -} +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } -func (r *rpc) StopAll(_ bool, w *string) error { + r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(pipeline string, w *string) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } + + r.p.Resume(pipelines) return nil } -func (r *rpc) ResumeAll(_ bool, w *string) error { - return nil +// from converts from transport entity to domain +func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { + headers := map[string][]string{} + + for k, v := range j.GetHeaders() { + headers[k] = v.GetValue() + } + + jb := &structs.Job{ + Job: j.GetJob(), + Headers: headers, + Ident: j.GetId(), + Payload: j.GetPayload(), + Options: &structs.Options{ + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), + Attempts: j.GetOptions().GetAttempts(), + RetryDelay: j.GetOptions().GetRetryDelay(), + Timeout: j.GetOptions().GetTimeout(), + }, + } + + return jb } diff --git a/plugins/jobs/structs/general.go b/plugins/jobs/structs/general.go new file mode 100644 index 00000000..ae754286 --- /dev/null +++ b/plugins/jobs/structs/general.go @@ -0,0 +1,19 @@ +package structs + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-value pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go deleted file mode 100644 index 1ef4d2ca..00000000 --- a/plugins/jobs/structs/job.go +++ /dev/null @@ -1,51 +0,0 @@ -package structs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/utils" -) - -// Job carries information about single job. -type Job struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -func (j *Job) ID() *string { - return j.Options.ID -} - -func (j *Job) Priority() *uint64 { - return j.Options.Priority -} - -// Body packs job payload into binary payload. -func (j *Job) Body() []byte { - return utils.AsBytes(j.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (j *Job) Context() []byte { - ctx, _ := json.Marshal( - struct { - ID *string `json:"id"` - Job string `json:"job"` - }{ID: j.Options.ID, Job: j.Job}, - ) - - return ctx -} - -func (j *Job) Ack() { - -} - -func (j *Job) Nack() { - -} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 3e1ada85..d48e2c56 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -6,10 +6,7 @@ import "time" type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set - Priority *uint64 `json:"priority"` - - // ID - generated ID for the job - ID *string `json:"id"` + Priority uint64 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go deleted file mode 100644 index 0aa5b177..00000000 --- a/plugins/jobs/structs/job_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package structs - -import ( - "testing" - - "github.com/spiral/roadrunner/v2/utils" - "github.com/stretchr/testify/assert" -) - -func TestJob_Body(t *testing.T) { - j := &Job{Payload: "hello"} - - assert.Equal(t, []byte("hello"), j.Body()) -} - -func TestJob_Context(t *testing.T) { - j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}} - - assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) -} diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index 9d8427be..2b474de9 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -21,7 +21,7 @@ const ( ) // single job request -type Request struct { +type PushRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -29,8 +29,8 @@ type Request struct { Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` } -func (x *Request) Reset() { - *x = Request{} +func (x *PushRequest) Reset() { + *x = PushRequest{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -38,13 +38,13 @@ func (x *Request) Reset() { } } -func (x *Request) String() string { +func (x *PushRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Request) ProtoMessage() {} +func (*PushRequest) ProtoMessage() {} -func (x *Request) ProtoReflect() protoreflect.Message { +func (x *PushRequest) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -56,12 +56,12 @@ func (x *Request) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Request.ProtoReflect.Descriptor instead. -func (*Request) Descriptor() ([]byte, []int) { +// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead. +func (*PushRequest) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{0} } -func (x *Request) GetJob() *Job { +func (x *PushRequest) GetJob() *Job { if x != nil { return x.Job } @@ -69,7 +69,7 @@ func (x *Request) GetJob() *Job { } // batch jobs request -type BatchRequest struct { +type PushBatchRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -77,8 +77,8 @@ type BatchRequest struct { Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"` } -func (x *BatchRequest) Reset() { - *x = BatchRequest{} +func (x *PushBatchRequest) Reset() { + *x = PushBatchRequest{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -86,13 +86,13 @@ func (x *BatchRequest) Reset() { } } -func (x *BatchRequest) String() string { +func (x *PushBatchRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*BatchRequest) ProtoMessage() {} +func (*PushBatchRequest) ProtoMessage() {} -func (x *BatchRequest) ProtoReflect() protoreflect.Message { +func (x *PushBatchRequest) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -104,29 +104,29 @@ func (x *BatchRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use BatchRequest.ProtoReflect.Descriptor instead. -func (*BatchRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use PushBatchRequest.ProtoReflect.Descriptor instead. +func (*PushBatchRequest) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{1} } -func (x *BatchRequest) GetJobs() []*Job { +func (x *PushBatchRequest) GetJobs() []*Job { if x != nil { return x.Jobs } return nil } -// RPC response -type Response struct { +// request to pause/resume +type MaintenanceRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"` } -func (x *Response) Reset() { - *x = Response{} +func (x *MaintenanceRequest) Reset() { + *x = MaintenanceRequest{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -134,13 +134,13 @@ func (x *Response) Reset() { } } -func (x *Response) String() string { +func (x *MaintenanceRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Response) ProtoMessage() {} +func (*MaintenanceRequest) ProtoMessage() {} -func (x *Response) ProtoReflect() protoreflect.Message { +func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -152,16 +152,55 @@ func (x *Response) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Response.ProtoReflect.Descriptor instead. -func (*Response) Descriptor() ([]byte, []int) { +// Deprecated: Use MaintenanceRequest.ProtoReflect.Descriptor instead. +func (*MaintenanceRequest) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{2} } -func (x *Response) GetId() string { +func (x *MaintenanceRequest) GetPipelines() []string { if x != nil { - return x.Id + return x.Pipelines + } + return nil +} + +// all endpoints returns nothing +type EmptyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *EmptyResponse) Reset() { + *x = EmptyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return "" +} + +func (x *EmptyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmptyResponse) ProtoMessage() {} + +func (x *EmptyResponse) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmptyResponse.ProtoReflect.Descriptor instead. +func (*EmptyResponse) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{3} } type Job struct { @@ -169,15 +208,17 @@ type Job struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` - Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` - Options *Options `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Payload string `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + Headers map[string]*HeaderValue `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Options *Options `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` } func (x *Job) Reset() { *x = Job{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[3] + mi := &file_jobs_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -190,7 +231,7 @@ func (x *Job) String() string { func (*Job) ProtoMessage() {} func (x *Job) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[3] + mi := &file_jobs_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -203,7 +244,7 @@ func (x *Job) ProtoReflect() protoreflect.Message { // Deprecated: Use Job.ProtoReflect.Descriptor instead. func (*Job) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{3} + return file_jobs_proto_rawDescGZIP(), []int{4} } func (x *Job) GetJob() string { @@ -213,6 +254,13 @@ func (x *Job) GetJob() string { return "" } +func (x *Job) GetId() string { + if x != nil { + return x.Id + } + return "" +} + func (x *Job) GetPayload() string { if x != nil { return x.Payload @@ -220,6 +268,13 @@ func (x *Job) GetPayload() string { return "" } +func (x *Job) GetHeaders() map[string]*HeaderValue { + if x != nil { + return x.Headers + } + return nil +} + func (x *Job) GetOptions() *Options { if x != nil { return x.Options @@ -227,24 +282,70 @@ func (x *Job) GetOptions() *Options { return nil } +type HeaderValue struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"` +} + +func (x *HeaderValue) Reset() { + *x = HeaderValue{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeaderValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderValue) ProtoMessage() {} + +func (x *HeaderValue) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead. +func (*HeaderValue) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{5} +} + +func (x *HeaderValue) GetValue() []string { + if x != nil { + return x.Value + } + return nil +} + type Options struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` - Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` - Pipeline string `protobuf:"bytes,3,opt,name=pipeline,proto3" json:"pipeline,omitempty"` - Delay uint64 `protobuf:"varint,4,opt,name=delay,proto3" json:"delay,omitempty"` - Attempts uint64 `protobuf:"varint,5,opt,name=attempts,proto3" json:"attempts,omitempty"` - RetryDelay uint64 `protobuf:"varint,6,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"` - Timeout uint64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"` + Pipeline string `protobuf:"bytes,2,opt,name=pipeline,proto3" json:"pipeline,omitempty"` + Delay uint64 `protobuf:"varint,3,opt,name=delay,proto3" json:"delay,omitempty"` + Attempts uint64 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"` + RetryDelay uint64 `protobuf:"varint,5,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"` + Timeout uint64 `protobuf:"varint,6,opt,name=timeout,proto3" json:"timeout,omitempty"` } func (x *Options) Reset() { *x = Options{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[4] + mi := &file_jobs_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -257,7 +358,7 @@ func (x *Options) String() string { func (*Options) ProtoMessage() {} func (x *Options) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[4] + mi := &file_jobs_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -270,7 +371,7 @@ func (x *Options) ProtoReflect() protoreflect.Message { // Deprecated: Use Options.ProtoReflect.Descriptor instead. func (*Options) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{4} + return file_jobs_proto_rawDescGZIP(), []int{6} } func (x *Options) GetPriority() uint64 { @@ -280,13 +381,6 @@ func (x *Options) GetPriority() uint64 { return 0 } -func (x *Options) GetId() string { - if x != nil { - return x.Id - } - return "" -} - func (x *Options) GetPipeline() string { if x != nil { return x.Pipeline @@ -326,35 +420,49 @@ var File_jobs_proto protoreflect.FileDescriptor var file_jobs_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f, - 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, - 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x34, 0x0a, 0x0c, 0x42, 0x61, 0x74, 0x63, - 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, - 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x1a, - 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x61, 0x0a, 0x03, 0x4a, 0x6f, - 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6a, 0x6f, 0x62, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a, - 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, - 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xbe, 0x01, - 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, - 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, - 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, - 0x70, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, - 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, - 0x61, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, - 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, - 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, - 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x31, 0x0a, 0x0b, 0x50, 0x75, 0x73, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, + 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x38, 0x0a, 0x10, + 0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, + 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, + 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x32, 0x0a, 0x12, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65, + 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, + 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x80, 0x02, 0x0a, 0x03, + 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, + 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, + 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, + 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, + 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, + 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, + 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, + 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, + 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -369,23 +477,28 @@ func file_jobs_proto_rawDescGZIP() []byte { return file_jobs_proto_rawDescData } -var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_jobs_proto_goTypes = []interface{}{ - (*Request)(nil), // 0: jobs.v1beta.Request - (*BatchRequest)(nil), // 1: jobs.v1beta.BatchRequest - (*Response)(nil), // 2: jobs.v1beta.Response - (*Job)(nil), // 3: jobs.v1beta.Job - (*Options)(nil), // 4: jobs.v1beta.Options + (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest + (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest + (*MaintenanceRequest)(nil), // 2: jobs.v1beta.MaintenanceRequest + (*EmptyResponse)(nil), // 3: jobs.v1beta.EmptyResponse + (*Job)(nil), // 4: jobs.v1beta.Job + (*HeaderValue)(nil), // 5: jobs.v1beta.HeaderValue + (*Options)(nil), // 6: jobs.v1beta.Options + nil, // 7: jobs.v1beta.Job.HeadersEntry } var file_jobs_proto_depIdxs = []int32{ - 3, // 0: jobs.v1beta.Request.job:type_name -> jobs.v1beta.Job - 3, // 1: jobs.v1beta.BatchRequest.jobs:type_name -> jobs.v1beta.Job - 4, // 2: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 4, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job + 4, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job + 7, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry + 6, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 5, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_jobs_proto_init() } @@ -395,7 +508,7 @@ func file_jobs_proto_init() { } if !protoimpl.UnsafeEnabled { file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Request); i { + switch v := v.(*PushRequest); i { case 0: return &v.state case 1: @@ -407,7 +520,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*BatchRequest); i { + switch v := v.(*PushBatchRequest); i { case 0: return &v.state case 1: @@ -419,7 +532,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Response); i { + switch v := v.(*MaintenanceRequest); i { case 0: return &v.state case 1: @@ -431,7 +544,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Job); i { + switch v := v.(*EmptyResponse); i { case 0: return &v.state case 1: @@ -443,6 +556,30 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HeaderValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Options); i { case 0: return &v.state @@ -461,7 +598,7 @@ func file_jobs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_jobs_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 8, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 13fd5595..eb920fb8 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -4,33 +4,41 @@ package jobs.v1beta; option go_package = "./;jobsv1beta"; // single job request -message Request { +message PushRequest { Job job = 1; } // batch jobs request -message BatchRequest { +message PushBatchRequest { repeated Job jobs = 1; } -// RPC response -message Response { - string id = 1; +// request to pause/resume +message MaintenanceRequest { + repeated string pipelines = 1; } +// all endpoints returns nothing +message EmptyResponse {} + message Job { string job = 1; - string payload = 2; - Options options = 3; + string id = 2; + string payload = 3; + map<string, HeaderValue> headers = 5; + Options options = 4; +} + +message HeaderValue { + repeated string value = 1; } message Options { uint64 priority = 1; - string id = 2; - string pipeline = 3; - uint64 delay = 4; - uint64 attempts = 5; - uint64 retry_delay = 6; - uint64 timeout = 7; + string pipeline = 2; + uint64 delay = 3; + uint64 attempts = 4; + uint64 retry_delay = 5; + uint64 timeout = 6; } |