diff options
author | Valery Piashchynski <[email protected]> | 2021-07-12 12:45:53 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-12 12:45:53 +0300 |
commit | aa1437d24ac215bec7fe053b06fa4773c9b1b1ad (patch) | |
tree | 7a6868867877f34ac5e2d490bfb589b3dce02917 | |
parent | 87971c4d310fe3d353197fc96b9b6f9106f01e57 (diff) |
Update JOBS interface, remove List() method, implemented on the root RPC
level.
AMQP consumer replace sync.Map with atomic.Value, because we associate
only 1 pipeline with a driver. So, we can store pipeline in the
atomic.Value.
Implement events handler, add job events. Use job events to push
information to the logger.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | common/jobs/interface.go | 7 | ||||
-rw-r--r-- | pkg/events/jobs_events.go | 35 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 67 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 9 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit_init.go | 10 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 22 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 53 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 9 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 69 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 13 | ||||
-rw-r--r-- | plugins/server/plugin.go | 11 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.pb.go | 198 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 7 |
13 files changed, 342 insertions, 168 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go index 426d5606..e94e97b1 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -1,6 +1,7 @@ package jobs import ( + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -12,14 +13,12 @@ type Consumer interface { Register(pipeline *pipeline.Pipeline) error Run(pipeline *pipeline.Pipeline) error Stop() error - // List of the pipelines - List() []string Pause(pipeline string) Resume(pipeline string) } type Constructor interface { - JobsConstruct(configKey string, queue priorityqueue.Queue) (Consumer, error) - FromPipeline(pipe *pipeline.Pipeline, queue priorityqueue.Queue) (Consumer, error) + JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (Consumer, error) + FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (Consumer, error) } diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go index ed07c7da..9a7116ff 100644 --- a/pkg/events/jobs_events.go +++ b/pkg/events/jobs_events.go @@ -20,23 +20,22 @@ const ( // EventJobError thrown on all job related errors. See JobError as context. EventJobError - // EventPipeConsume when pipeline pipelines has been requested. - EventPipeConsume + // EventPipeRun when pipeline pipelines has been requested. + EventPipeRun + + EventInitialized // EventPipeActive when pipeline has started. EventPipeActive - // EventPipeStop when pipeline has begun stopping. - EventPipeStop - // EventPipeStopped when pipeline has been stopped. EventPipeStopped // EventPipeError when pipeline specific error happen. EventPipeError - // EventBrokerReady thrown when broken is ready to accept/serve tasks. - EventBrokerReady + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady ) type J int64 @@ -53,18 +52,18 @@ func (ev J) String() string { return "EventJobOK" case EventJobError: return "EventJobError" - case EventPipeConsume: - return "EventPipeConsume" + case EventPipeRun: + return "EventPipeRun" + case EventInitialized: + return "EventInitialized" case EventPipeActive: return "EventPipeActive" - case EventPipeStop: - return "EventPipeStop" case EventPipeStopped: return "EventPipeStopped" case EventPipeError: return "EventPipeError" - case EventBrokerReady: - return "EventBrokerReady" + case EventDriverReady: + return "EventDriverReady" } return UnknownEventType } @@ -75,8 +74,14 @@ type JobEvent struct { // String is job id. ID string - // Job is failed job. - Job interface{} // this is *jobs.Job, but interface used to avoid package import + // Pipeline name + Pipeline string + + // Associated driver name (amqp, ephemeral, etc) + Driver string + + // Error for the jobs/pipes errors + Error error // event timings Start time.Time diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index a7916f7e..481e102a 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -3,10 +3,12 @@ package amqp import ( "fmt" "sync" + "sync/atomic" "time" "github.com/google/uuid" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -48,8 +50,9 @@ type JobsConsumer struct { sync.RWMutex log logger.Logger pq priorityqueue.Queue + eh events.Handler - pipelines sync.Map + pipeline atomic.Value // amqp connection conn *amqp.Connection @@ -71,7 +74,7 @@ type JobsConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -79,6 +82,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, jb := &JobsConsumer{ log: log, pq: pq, + eh: e, consumeID: uuid.NewString(), stopCh: make(chan struct{}), // TODO to config @@ -146,13 +150,14 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ log: log, + eh: e, pq: pq, consumeID: uuid.NewString(), stopCh: make(chan struct{}), @@ -210,8 +215,11 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered - if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != job.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } // lock needed here to protect redial concurrent operation @@ -303,20 +311,16 @@ func (j *JobsConsumer) Push(job *structs.Job) error { } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { - const op = errors.Op("rabbitmq_register") - if _, ok := j.pipelines.Load(pipeline.Name()); ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) - } - - j.pipelines.Store(pipeline.Name(), struct{}{}) - + j.pipeline.Store(pipeline) return nil } -func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { +func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") - if _, ok := j.pipelines.Load(pipeline.Name()); !ok { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } // protect connection (redial) @@ -354,22 +358,10 @@ func (j *JobsConsumer) Run(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobsConsumer) List() []string { - out := make([]string, 0, 2) - - j.pipelines.Range(func(key, value interface{}) bool { - pipe := key.(string) - out = append(out, pipe) - return true - }) - - return out -} - -func (j *JobsConsumer) Pause(pipeline string) { - if _, ok := j.pipelines.Load(pipeline); !ok { - j.log.Error("no such pipeline", "requested pause on", pipeline) - return +func (j *JobsConsumer) Pause(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) } // protect connection (redial) @@ -386,10 +378,10 @@ func (j *JobsConsumer) Pause(pipeline string) { } } -func (j *JobsConsumer) Resume(pipeline string) { - if _, ok := j.pipelines.Load(pipeline); !ok { - j.log.Error("no such pipeline", "requested pause on", pipeline) - return +func (j *JobsConsumer) Resume(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) } // protect connection (redial) @@ -430,11 +422,6 @@ func (j *JobsConsumer) Resume(pipeline string) { func (j *JobsConsumer) Stop() error { j.stopCh <- struct{}{} - j.pipelines.Range(func(key, _ interface{}) bool { - j.pipelines.Delete(key) - return true - }) - return nil } diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index ca972c5b..624f4405 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -2,6 +2,7 @@ package amqp import ( "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -29,11 +30,11 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} -func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewAMQPConsumer(configKey, p.log, p.cfg, pq) +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) } // FromPipeline constructs AMQP driver from pipeline -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, pq) +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, e, pq) } diff --git a/plugins/jobs/brokers/amqp/rabbit_init.go b/plugins/jobs/brokers/amqp/rabbit_init.go index cb9f2dc4..e3e5f8da 100644 --- a/plugins/jobs/brokers/amqp/rabbit_init.go +++ b/plugins/jobs/brokers/amqp/rabbit_init.go @@ -1,11 +1,14 @@ package amqp import ( + "time" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" ) func (j *JobsConsumer) initRabbitMQ() error { - const op = errors.Op("rabbit_initmq") + const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. @@ -53,5 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() error { return errors.E(op, err) } + j.eh.Push(events.JobEvent{ + Event: events.EventInitialized, + Driver: "amqp", + Start: time.Now(), + }) return channel.Close() } diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 277e75b7..571ee548 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -2,9 +2,12 @@ package amqp import ( "fmt" + "time" "github.com/cenkalti/backoff/v4" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/streadway/amqp" ) @@ -22,6 +25,17 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit j.Lock() + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + Elapsed: 0, + }) + j.log.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) @@ -85,6 +99,14 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit return } + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + j.Unlock() case <-j.stopCh: diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index b51af322..559cb2e9 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -5,6 +5,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -23,19 +24,21 @@ type Config struct { type JobBroker struct { cfg *Config log logger.Logger - queues sync.Map + eh events.Handler + pipeline sync.Map pq priorityqueue.Queue localQueue chan *Item stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ log: log, pq: pq, + eh: eh, stopCh: make(chan struct{}, 1), } @@ -57,10 +60,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, pq priorityqueue.Queue) (*JobBroker, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ log: log, pq: pq, + eh: eh, stopCh: make(chan struct{}, 1), } @@ -79,7 +83,7 @@ func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if b, ok := j.pipeline.Load(job.Options.Pipeline); ok { if !b.(bool) { return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } @@ -119,37 +123,51 @@ func (j *JobBroker) consume() { func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues.Load(pipeline.Name()); ok { + if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues.Store(pipeline.Name(), true) + j.pipeline.Store(pipeline.Name(), true) return nil } func (j *JobBroker) Pause(pipeline string) { - if q, ok := j.queues.Load(pipeline); ok { + if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off - j.queues.Store(pipeline, false) + j.pipeline.Store(pipeline, false) } } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) } func (j *JobBroker) Resume(pipeline string) { - if q, ok := j.queues.Load(pipeline); ok { + if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned off - j.queues.Store(pipeline, true) + j.pipeline.Store(pipeline, true) } } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) } func (j *JobBroker) List() []string { out := make([]string, 0, 2) - j.queues.Range(func(key, value interface{}) bool { + j.pipeline.Range(func(key, value interface{}) bool { pipe := key.(string) out = append(out, pipe) return true @@ -164,13 +182,22 @@ func (j *JobBroker) Run(_ *pipeline.Pipeline) error { } func (j *JobBroker) Stop() error { - j.queues.Range(func(key, _ interface{}) bool { - j.queues.Delete(key) + var pipe string + j.pipeline.Range(func(key, _ interface{}) bool { + pipe = key.(string) + j.pipeline.Delete(key) return true }) // return from the consumer j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + return nil } diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index bfe2d6ac..28495abb 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -2,6 +2,7 @@ package ephemeral import ( "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -30,11 +31,11 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} // JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, pq) +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, e, pq) } // FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, pq) +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index aab23f1c..86289aba 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -70,7 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.cfg.InitDefaults() p.server = server + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectJobsEvents) + p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) @@ -117,6 +120,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { + t := time.Now() // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) @@ -132,7 +136,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.queue) + initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false @@ -155,9 +159,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit errCh <- errors.E(op, err) return false } + + p.events.Push(events.JobEvent{ + Event: events.EventPipeRun, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: t.Sub(t), + }) + return true } + + return true } + p.events.Push(events.JobEvent{ + Event: events.EventDriverReady, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: t.Sub(t), + }) return true }) @@ -279,7 +301,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents) if err != nil { return errors.E(op, err) } @@ -404,7 +426,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // we need here to initialize these drivers for the pipelines if c, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.queue) + initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } @@ -451,9 +473,50 @@ func (p *Plugin) Destroy(pp string) error { return d.Stop() } +func (p *Plugin) List() []string { + out := make([]string, 0, 10) + + p.pipelines.Range(func(key, _ interface{}) bool { + // we can safely convert value here as we know that we store keys as strings + out = append(out, key.(string)) + return true + }) + + return out +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, p: p, } } + +func (p *Plugin) collectJobsEvents(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobOK: + p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushOK: + p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushError: + p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobError: + p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeRun: + p.log.Info("pipeline started", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeActive: + p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeStopped: + p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeError: + p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventDriverReady: + p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventInitialized: + p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC()) + } + } +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 6718b99a..0bb94fa4 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -26,7 +26,7 @@ List of the RPC methods: 7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain @@ -44,7 +44,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error return nil } -func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -65,7 +65,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyRespo return nil } -func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -76,7 +76,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyRespo return nil } -func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -87,6 +87,11 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResp return nil } +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { + resp.Pipelines = r.p.List() + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { headers := map[string][]string{} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 038d83d4..1694cdf1 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -266,14 +266,3 @@ func (server *Plugin) collectWorkerEvents(event interface{}) { } } } - -func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused - if jev, ok := event.(events.JobEvent); ok { - switch jev.Event { - case events.EventJobStart: - server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed) - case events.EventJobOK: - server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed) - } - } -} diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index 81d35bf8..38711806 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -7,11 +7,10 @@ package jobsv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( @@ -165,15 +164,16 @@ func (x *MaintenanceRequest) GetPipelines() []string { return nil } +// some endpoints receives nothing // all endpoints returns nothing -type EmptyResponse struct { +type Empty struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } -func (x *EmptyResponse) Reset() { - *x = EmptyResponse{} +func (x *Empty) Reset() { + *x = Empty{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -181,13 +181,13 @@ func (x *EmptyResponse) Reset() { } } -func (x *EmptyResponse) String() string { +func (x *Empty) String() string { return protoimpl.X.MessageStringOf(x) } -func (*EmptyResponse) ProtoMessage() {} +func (*Empty) ProtoMessage() {} -func (x *EmptyResponse) ProtoReflect() protoreflect.Message { +func (x *Empty) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -199,11 +199,58 @@ func (x *EmptyResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use EmptyResponse.ProtoReflect.Descriptor instead. -func (*EmptyResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{3} } +type List struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"` +} + +func (x *List) Reset() { + *x = List{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *List) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*List) ProtoMessage() {} + +func (x *List) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use List.ProtoReflect.Descriptor instead. +func (*List) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{4} +} + +func (x *List) GetPipelines() []string { + if x != nil { + return x.Pipelines + } + return nil +} + type Job struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -219,7 +266,7 @@ type Job struct { func (x *Job) Reset() { *x = Job{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[4] + mi := &file_jobs_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -232,7 +279,7 @@ func (x *Job) String() string { func (*Job) ProtoMessage() {} func (x *Job) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[4] + mi := &file_jobs_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -245,7 +292,7 @@ func (x *Job) ProtoReflect() protoreflect.Message { // Deprecated: Use Job.ProtoReflect.Descriptor instead. func (*Job) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{4} + return file_jobs_proto_rawDescGZIP(), []int{5} } func (x *Job) GetJob() string { @@ -294,7 +341,7 @@ type HeaderValue struct { func (x *HeaderValue) Reset() { *x = HeaderValue{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[5] + mi := &file_jobs_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -307,7 +354,7 @@ func (x *HeaderValue) String() string { func (*HeaderValue) ProtoMessage() {} func (x *HeaderValue) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[5] + mi := &file_jobs_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -320,7 +367,7 @@ func (x *HeaderValue) ProtoReflect() protoreflect.Message { // Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead. func (*HeaderValue) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{5} + return file_jobs_proto_rawDescGZIP(), []int{6} } func (x *HeaderValue) GetValue() []string { @@ -346,7 +393,7 @@ type Options struct { func (x *Options) Reset() { *x = Options{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[6] + mi := &file_jobs_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -359,7 +406,7 @@ func (x *Options) String() string { func (*Options) ProtoMessage() {} func (x *Options) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[6] + mi := &file_jobs_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -372,7 +419,7 @@ func (x *Options) ProtoReflect() protoreflect.Message { // Deprecated: Use Options.ProtoReflect.Descriptor instead. func (*Options) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{6} + return file_jobs_proto_rawDescGZIP(), []int{7} } func (x *Options) GetPriority() uint64 { @@ -431,39 +478,41 @@ var file_jobs_proto_rawDesc = []byte{ 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x32, 0x0a, 0x12, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x80, 0x02, 0x0a, 0x03, - 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, - 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, - 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, - 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, - 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, - 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, - 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, - 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, - 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, - 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, - 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, - 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, - 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, - 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, - 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, - 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, - 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x24, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, + 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, + 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, + 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, + 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, 0x0a, 0x0b, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, + 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, + 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, + 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, + 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, + 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, + 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, + 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -478,23 +527,24 @@ func file_jobs_proto_rawDescGZIP() []byte { return file_jobs_proto_rawDescData } -var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_jobs_proto_goTypes = []interface{}{ (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest (*MaintenanceRequest)(nil), // 2: jobs.v1beta.MaintenanceRequest - (*EmptyResponse)(nil), // 3: jobs.v1beta.EmptyResponse - (*Job)(nil), // 4: jobs.v1beta.Job - (*HeaderValue)(nil), // 5: jobs.v1beta.HeaderValue - (*Options)(nil), // 6: jobs.v1beta.Options - nil, // 7: jobs.v1beta.Job.HeadersEntry + (*Empty)(nil), // 3: jobs.v1beta.Empty + (*List)(nil), // 4: jobs.v1beta.List + (*Job)(nil), // 5: jobs.v1beta.Job + (*HeaderValue)(nil), // 6: jobs.v1beta.HeaderValue + (*Options)(nil), // 7: jobs.v1beta.Options + nil, // 8: jobs.v1beta.Job.HeadersEntry } var file_jobs_proto_depIdxs = []int32{ - 4, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job - 4, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job - 7, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry - 6, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options - 5, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue + 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job + 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job + 8, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry + 7, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 6, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue 5, // [5:5] is the sub-list for method output_type 5, // [5:5] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name @@ -545,7 +595,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*EmptyResponse); i { + switch v := v.(*Empty); i { case 0: return &v.state case 1: @@ -557,7 +607,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Job); i { + switch v := v.(*List); i { case 0: return &v.state case 1: @@ -569,7 +619,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HeaderValue); i { + switch v := v.(*Job); i { case 0: return &v.state case 1: @@ -581,6 +631,18 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HeaderValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Options); i { case 0: return &v.state @@ -599,7 +661,7 @@ func file_jobs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_jobs_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index eb920fb8..9ff967d4 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -18,8 +18,13 @@ message MaintenanceRequest { repeated string pipelines = 1; } +// some endpoints receives nothing // all endpoints returns nothing -message EmptyResponse {} +message Empty {} + +message List { + repeated string pipelines = 1; +} message Job { string job = 1; |