diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
commit | 60c229c8506df465586434309af5acd1f84e2406 (patch) | |
tree | 18fdf380b7e032415d656e84bcc3c7a057f194a8 /plugins/jobs/brokers | |
parent | 127186a72d4b8d30f6ada72ade661d8713490728 (diff) |
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines
handling mechanism.
Add delayed jobs for the ephemeral plugin.
Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with
a slice of the pipelines.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers')
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go (renamed from plugins/jobs/brokers/ephemeral/broker.go) | 57 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 125 |
5 files changed, 155 insertions, 30 deletions
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/config.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/consumer.go index 6c7108f6..e31e3b25 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -2,13 +2,12 @@ package ephemeral import ( "sync" + "time" - "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" - "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { @@ -25,39 +24,47 @@ func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (*string, error) { +func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered if b, ok := j.queues.Load(job.Options.Pipeline); ok { if !b.(bool) { - return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } - if job.Options.Priority == nil { - job.Options.Priority = utils.AsUint64Ptr(10) + + // handle timeouts + if job.Options.Timeout > 0 { + go func(jj *structs.Job) { + time.Sleep(jj.Options.TimeoutDuration()) + + // send the item after timeout expired + j.pq.Insert(From(job)) + }(job) + + return nil } - job.Options.ID = utils.AsStringPtr(uuid.NewString()) - j.pq.Insert(job) + j.pq.Insert(From(job)) - return job.Options.ID, nil + return nil } - return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } -func (j *JobBroker) Register(pipeline string) error { +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues.Load(pipeline); ok { + if _, ok := j.queues.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues.Store(pipeline, true) + j.queues.Store(pipeline.Name(), true) return nil } -func (j *JobBroker) Stop(pipeline string) { +func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -66,13 +73,6 @@ func (j *JobBroker) Stop(pipeline string) { } } -func (j *JobBroker) StopAll() { - j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, false) - return true - }) -} - func (j *JobBroker) Resume(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == false { @@ -82,17 +82,14 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) ResumeAll() { +func (j *JobBroker) List() []*pipeline.Pipeline { + out := make([]*pipeline.Pipeline, 0, 2) + j.queues.Range(func(key, value interface{}) bool { - j.queues.Store(key, true) + pipe := key.(*pipeline.Pipeline) + out = append(out, pipe) return true }) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return out } diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go new file mode 100644 index 00000000..e2caa53a --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -0,0 +1,125 @@ +package ephemeral + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" +) + +func From(job *structs.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: conv(*job.Options), + } +} + +func conv(jo structs.Options) Options { + return Options(jo) +} + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority uint64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay uint64 `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts uint64 `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay uint64 `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout uint64 `json:"timeout,omitempty"` +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt uint64) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() uint64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (j *Item) Ack() { + // noop for the in-memory +} + +func (j *Item) Nack() { + // noop for the in-memory +} |