From d4c92e48bada7593b6fbec612a742c599de6e736 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 15 Jun 2021 22:12:32 +0300 Subject: - Jobs plugin initial commit Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 plugins/jobs/rpc.go (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..cc1ecd99 --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,151 @@ +package jobs + +import ( + "fmt" + "github.com/spiral/roadrunner/util" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []*util.State `json:"workers"` +} + +// PipelineList contains list of pipeline stats. +type PipelineList struct { + // Pipelines is list of pipeline stats. + Pipelines []*Stat `json:"pipelines"` +} + +// Push job to the testQueue. +func (rpc *rpcServer) Push(j *Job, id *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *id, err = rpc.svc.Push(j) + return +} + +// Push job to the testQueue. +func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *ok = true + go rpc.svc.Push(j) + + return +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, w *string) error { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *w = "OK" + return rpc.svc.rr.Reset() +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) { + if rpc.svc == nil || rpc.svc.rr == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Workers returns list of pipelines workers and their stats. +func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + w.Workers, err = util.ServerState(rpc.svc.rr) + return err +} + +// Stat returns list of pipelines workers and their stats. +func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *l = PipelineList{} + for _, p := range rpc.svc.cfg.pipelines { + stat, err := rpc.svc.Stat(p) + if err != nil { + return err + } + + l.Pipelines = append(l.Pipelines, stat) + } + + return err +} -- cgit v1.2.3 From cee4bc46097506d6e892b6af194751434700621a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 16 Jun 2021 12:56:02 +0300 Subject: - Update jobs sources - Update Arch diagramm Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 151 ---------------------------------------------------- 1 file changed, 151 deletions(-) delete mode 100644 plugins/jobs/rpc.go (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go deleted file mode 100644 index cc1ecd99..00000000 --- a/plugins/jobs/rpc.go +++ /dev/null @@ -1,151 +0,0 @@ -package jobs - -import ( - "fmt" - "github.com/spiral/roadrunner/util" -) - -type rpcServer struct{ svc *Service } - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []*util.State `json:"workers"` -} - -// PipelineList contains list of pipeline stats. -type PipelineList struct { - // Pipelines is list of pipeline stats. - Pipelines []*Stat `json:"pipelines"` -} - -// Push job to the testQueue. -func (rpc *rpcServer) Push(j *Job, id *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *id, err = rpc.svc.Push(j) - return -} - -// Push job to the testQueue. -func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *ok = true - go rpc.svc.Push(j) - - return -} - -// Reset resets underlying RR worker pool and restarts all of it's workers. -func (rpc *rpcServer) Reset(reset bool, w *string) error { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *w = "OK" - return rpc.svc.rr.Reset() -} - -// Destroy job pipelines for a given pipeline. -func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - pipe := rpc.svc.cfg.pipelines.Get(pipeline) - if pipe == nil { - return fmt.Errorf("undefined pipeline `%s`", pipeline) - } - - if err := rpc.svc.Consume(pipe, nil, nil); err != nil { - return err - } - - *w = "OK" - return nil -} - -// Resume job pipelines for a given pipeline. -func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - pipe := rpc.svc.cfg.pipelines.Get(pipeline) - if pipe == nil { - return fmt.Errorf("undefined pipeline `%s`", pipeline) - } - - if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { - return err - } - - *w = "OK" - return nil -} - -// Destroy job pipelines for a given pipeline. -func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) { - if rpc.svc == nil || rpc.svc.rr == nil { - return fmt.Errorf("jobs server is not running") - } - - for _, pipe := range rpc.svc.cfg.pipelines { - if err := rpc.svc.Consume(pipe, nil, nil); err != nil { - return err - } - } - - *w = "OK" - return nil -} - -// Resume job pipelines for a given pipeline. -func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - for _, pipe := range rpc.svc.cfg.pipelines { - if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { - return err - } - } - - *w = "OK" - return nil -} - -// Workers returns list of pipelines workers and their stats. -func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - w.Workers, err = util.ServerState(rpc.svc.rr) - return err -} - -// Stat returns list of pipelines workers and their stats. -func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *l = PipelineList{} - for _, p := range rpc.svc.cfg.pipelines { - stat, err := rpc.svc.Stat(p) - if err != nil { - return err - } - - l.Pipelines = append(l.Pipelines, stat) - } - - return err -} -- cgit v1.2.3 From 41bb9fa5938125217a075c60f1e39dc3a9a27537 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 21 Jun 2021 17:01:39 +0300 Subject: - Rework dispatcher, pipeline, job (not completely) Create a config sample with RR2 support. Progress on root JOBS plugin. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 plugins/jobs/rpc.go (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..5a0bbf4e --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,8 @@ +package jobs + +import "github.com/spiral/roadrunner/v2/plugins/logger" + +type rpc struct { + log logger.Logger +} + -- cgit v1.2.3 From 1a2a1f4735e40675abf6cd9767c99374359ec2bb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 22 Jun 2021 11:44:22 +0300 Subject: - Remove all old code, reformat, fix linters, return GA Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 5a0bbf4e..dbe7f808 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -5,4 +5,3 @@ import "github.com/spiral/roadrunner/v2/plugins/logger" type rpc struct { log logger.Logger } - -- cgit v1.2.3 From 035e432af9a059e9e5187bd03f2e7864ed94c054 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 22 Jun 2021 17:33:55 +0300 Subject: - Folders struct - Initial ephemeral broker commit - Initial RPC Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index dbe7f808..e77cda59 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,7 +1,20 @@ package jobs -import "github.com/spiral/roadrunner/v2/plugins/logger" +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) type rpc struct { log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *structs.Job, idRet *string) error { + id, err := r.p.Push(j) + if err != nil { + panic(err) + } + *idRet = id + return nil } -- cgit v1.2.3 From 2c78e93222cc9d3b88456175348e42f7f40c449b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 6 Jul 2021 17:30:31 +0300 Subject: Rework ephemeral and binary heaps Implemented a sync.Cond for binary heap algo to save processor from spinning in the for loop and receiving nil Items until the Queue will be filled. Add num_pollers option to the configuration to specify number of pollers from the queue. Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral. Remove map and use sync.Map in the ephemeral broker. Add protobuf schema. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 3 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index e77cda59..c6bd1645 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,12 @@ package jobs import ( + "sync" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) type rpc struct { @@ -10,11 +14,96 @@ type rpc struct { p *Plugin } -func (r *rpc) Push(j *structs.Job, idRet *string) error { +var jobsPool = &sync.Pool{ + New: func() interface{} { + return &structs.Job{ + Options: &structs.Options{}, + } + }, +} + +func pubJob(j *structs.Job) { + // clear + j.Job = "" + j.Payload = "" + j.Options = &structs.Options{} + jobsPool.Put(j) +} + +func getJob() *structs.Job { + return jobsPool.Get().(*structs.Job) +} + +/* +List of the RPC methods: +1. Push - single job push +2. PushBatch - push job batch + +3. Reset - managed by the Resetter plugin + +4. Stop - stop pipeline processing +5. StopAll - stop all pipelines processing +6. Resume - resume pipeline processing +7. ResumeAll - resume stopped pipelines + +8. Workers - managed by the Informer plugin. +9. Stat - jobs statistic +*/ + +func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { + const op = errors.Op("jobs_rpc_push") + + // convert transport entity into domain + // how we can do this quickly + jb := getJob() + defer pubJob(jb) + + jb = &structs.Job{ + Job: j.GetJob().Job, + Payload: j.GetJob().Payload, + Options: &structs.Options{ + Priority: &j.GetJob().Options.Priority, + ID: &j.GetJob().Options.Id, + Pipeline: j.GetJob().Options.Pipeline, + Delay: j.GetJob().Options.Delay, + Attempts: j.GetJob().Options.Attempts, + RetryDelay: j.GetJob().Options.RetryDelay, + Timeout: j.GetJob().Options.Timeout, + }, + } + id, err := r.p.Push(jb) + if err != nil { + return errors.E(op, err) + } + + resp.Id = *id + + return nil +} + +func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { + const op = errors.Op("jobs_rpc_push") id, err := r.p.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - *idRet = id + + *idRet = *id + return nil +} + +func (r *rpc) Stop(pipeline string, w *string) error { + return nil +} + +func (r *rpc) StopAll(_ bool, w *string) error { + return nil +} + +func (r *rpc) Resume(pipeline string, w *string) error { + return nil +} + +func (r *rpc) ResumeAll(_ bool, w *string) error { return nil } -- cgit v1.2.3 From 36bf9228e60f59f569e84822e2860980d7ed698d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 6 Jul 2021 23:32:23 +0300 Subject: Update Jobs interface... Use bh.len everywhere in the binary heaps algo instead of direct len check. Add Ack/Nack to the main jobs loop. Add PushBatch method to the jobs rpc layer. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 58 ++++++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index c6bd1645..0d4cc099 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,6 @@ package jobs import ( - "sync" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,26 +12,6 @@ type rpc struct { p *Plugin } -var jobsPool = &sync.Pool{ - New: func() interface{} { - return &structs.Job{ - Options: &structs.Options{}, - } - }, -} - -func pubJob(j *structs.Job) { - // clear - j.Job = "" - j.Payload = "" - j.Options = &structs.Options{} - jobsPool.Put(j) -} - -func getJob() *structs.Job { - return jobsPool.Get().(*structs.Job) -} - /* List of the RPC methods: 1. Push - single job push @@ -55,10 +33,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { // convert transport entity into domain // how we can do this quickly - jb := getJob() - defer pubJob(jb) - - jb = &structs.Job{ + jb := &structs.Job{ Job: j.GetJob().Job, Payload: j.GetJob().Payload, Options: &structs.Options{ @@ -71,6 +46,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { Timeout: j.GetJob().Options.Timeout, }, } + id, err := r.p.Push(jb) if err != nil { return errors.E(op, err) @@ -81,14 +57,38 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { return nil } -func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { +func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { const op = errors.Op("jobs_rpc_push") - id, err := r.p.Push(j) + + l := len(j.GetJobs()) + + batch := make([]*structs.Job, l) + + for i := 0; i < l; i++ { + // convert transport entity into domain + // how we can do this quickly + jb := &structs.Job{ + Job: j.GetJobs()[i].Job, + Payload: j.GetJobs()[i].Payload, + Options: &structs.Options{ + Priority: &j.GetJobs()[i].Options.Priority, + ID: &j.GetJobs()[i].Options.Id, + Pipeline: j.GetJobs()[i].Options.Pipeline, + Delay: j.GetJobs()[i].Options.Delay, + Attempts: j.GetJobs()[i].Options.Attempts, + RetryDelay: j.GetJobs()[i].Options.RetryDelay, + Timeout: j.GetJobs()[i].Options.Timeout, + }, + } + + batch[i] = jb + } + + _, err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } - *idRet = *id return nil } -- cgit v1.2.3 From 60c229c8506df465586434309af5acd1f84e2406 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 7 Jul 2021 18:33:04 +0300 Subject: Updated ephemeral plugin, PQ and protobuf... Implement core of the root jobs plugin with a proper drivers/pipelines handling mechanism. Add delayed jobs for the ephemeral plugin. Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with a slice of the pipelines. Other small improvements. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 97 ++++++++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 46 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0d4cc099..6718b99a 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -19,45 +19,32 @@ List of the RPC methods: 3. Reset - managed by the Resetter plugin -4. Stop - stop pipeline processing -5. StopAll - stop all pipelines processing -6. Resume - resume pipeline processing -7. ResumeAll - resume stopped pipelines +4. Pause - pauses set of pipelines +5. Resume - resumes set of pipelines -8. Workers - managed by the Informer plugin. -9. Stat - jobs statistic +6. Workers - managed by the Informer plugin. +7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJob().Job, - Payload: j.GetJob().Payload, - Options: &structs.Options{ - Priority: &j.GetJob().Options.Priority, - ID: &j.GetJob().Options.Id, - Pipeline: j.GetJob().Options.Pipeline, - Delay: j.GetJob().Options.Delay, - Attempts: j.GetJob().Options.Attempts, - RetryDelay: j.GetJob().Options.RetryDelay, - Timeout: j.GetJob().Options.Timeout, - }, + + if j.GetJob().GetId() == "" { + return errors.E(op, errors.Str("empty ID field not allowed")) } - id, err := r.p.Push(jb) + err := r.p.Push(r.from(j.GetJob())) if err != nil { return errors.E(op, err) } - resp.Id = *id - return nil } -func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -67,24 +54,10 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJobs()[i].Job, - Payload: j.GetJobs()[i].Payload, - Options: &structs.Options{ - Priority: &j.GetJobs()[i].Options.Priority, - ID: &j.GetJobs()[i].Options.Id, - Pipeline: j.GetJobs()[i].Options.Pipeline, - Delay: j.GetJobs()[i].Options.Delay, - Attempts: j.GetJobs()[i].Options.Attempts, - RetryDelay: j.GetJobs()[i].Options.RetryDelay, - Timeout: j.GetJobs()[i].Options.Timeout, - }, - } - - batch[i] = jb + batch[i] = r.from(j.GetJobs()[i]) } - _, err := r.p.PushBatch(batch) + err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } @@ -92,18 +65,50 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e return nil } -func (r *rpc) Stop(pipeline string, w *string) error { - return nil -} +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } -func (r *rpc) StopAll(_ bool, w *string) error { + r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(pipeline string, w *string) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } + + r.p.Resume(pipelines) return nil } -func (r *rpc) ResumeAll(_ bool, w *string) error { - return nil +// from converts from transport entity to domain +func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { + headers := map[string][]string{} + + for k, v := range j.GetHeaders() { + headers[k] = v.GetValue() + } + + jb := &structs.Job{ + Job: j.GetJob(), + Headers: headers, + Ident: j.GetId(), + Payload: j.GetPayload(), + Options: &structs.Options{ + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), + Attempts: j.GetOptions().GetAttempts(), + RetryDelay: j.GetOptions().GetRetryDelay(), + Timeout: j.GetOptions().GetTimeout(), + }, + } + + return jb } -- cgit v1.2.3 From aa1437d24ac215bec7fe053b06fa4773c9b1b1ad Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 12 Jul 2021 12:45:53 +0300 Subject: Update JOBS interface, remove List() method, implemented on the root RPC level. AMQP consumer replace sync.Map with atomic.Value, because we associate only 1 pipeline with a driver. So, we can store pipeline in the atomic.Value. Implement events handler, add job events. Use job events to push information to the logger. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 6718b99a..0bb94fa4 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -26,7 +26,7 @@ List of the RPC methods: 7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain @@ -44,7 +44,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error return nil } -func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -65,7 +65,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyRespo return nil } -func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -76,7 +76,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyRespo return nil } -func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -87,6 +87,11 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResp return nil } +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { + resp.Pipelines = r.p.List() + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { headers := map[string][]string{} -- cgit v1.2.3 From d099e47ab28dd044d34e18347a4c714b8af3d612 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 11:35:12 +0300 Subject: SQS driver. Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0bb94fa4..a2bd9c6d 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -2,7 +2,7 @@ package jobs import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/logger" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) @@ -49,7 +49,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err l := len(j.GetJobs()) - batch := make([]*structs.Job, l) + batch := make([]*job.Job, l) for i := 0; i < l; i++ { // convert transport entity into domain @@ -93,19 +93,19 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { } // from converts from transport entity to domain -func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { +func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} for k, v := range j.GetHeaders() { headers[k] = v.GetValue() } - jb := &structs.Job{ + jb := &job.Job{ Job: j.GetJob(), Headers: headers, Ident: j.GetId(), Payload: j.GetPayload(), - Options: &structs.Options{ + Options: &job.Options{ Priority: j.GetOptions().GetPriority(), Pipeline: j.GetOptions().GetPipeline(), Delay: j.GetOptions().GetDelay(), -- cgit v1.2.3 From 9c51360f9119a4114bdcc21c8e61f0908a3c876d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 18 Jul 2021 11:32:44 +0300 Subject: Started beanstalk driver. Add new Queue impl (not finished yet). Fix bugs in the AMQP, update proto-api Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index a2bd9c6d..4333c587 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) @@ -65,7 +66,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -76,7 +77,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -87,11 +88,32 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) er return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { resp.Pipelines = r.p.List() return nil } +// Declare pipeline used to dynamically declare any type of the pipeline +// Mandatory fields: +// 1. Driver +// 2. Pipeline name +// 3. Options related to the particular pipeline +func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("rcp_declare_pipeline") + pipe := &pipeline.Pipeline{} + + for i := range req.GetPipeline() { + (*pipe)[i] = req.GetPipeline()[i] + } + + err := r.p.Declare(pipe) + if err != nil { + return errors.E(op, err) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} -- cgit v1.2.3 From 02fc3664f4ad97e03c8f3a641e7322362f78721c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 19 Jul 2021 09:49:16 +0300 Subject: Worker watcher interface update. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 4333c587..10158e74 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -15,7 +15,7 @@ type rpc struct { /* List of the RPC methods: -1. Push - single job push +1. Release - single job push 2. PushBatch - push job batch 3. Reset - managed by the Resetter plugin -- cgit v1.2.3 From e9713a1d08a93e2be70c889c600ed89f54822b54 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 18:05:31 +0300 Subject: Fix AMQP bugs, add more amqp tests Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 10158e74..0d15fb0f 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -66,29 +66,23 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Pause(req.GetPipelines()[i]) } - r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Resume(req.GetPipelines()[i]) } - r.p.Resume(pipelines) return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { resp.Pipelines = r.p.List() return nil } @@ -114,6 +108,24 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error return nil } +func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { + const op = errors.Op("rcp_declare_pipeline") + + var destroyed []string + for i := 0; i < len(req.GetPipelines()); i++ { + err := r.p.Destroy(req.GetPipelines()[i]) + if err != nil { + return errors.E(op, err) + } + destroyed = append(destroyed, req.GetPipelines()[i]) + } + + // return destroyed pipelines + resp.Pipelines = destroyed + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} -- cgit v1.2.3 From 2ceebd687fd17b6029ef3df0e979c39bb39abc7f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 18:10:33 +0300 Subject: Linters Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0d15fb0f..aeba499b 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -111,7 +111,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { const op = errors.Op("rcp_declare_pipeline") - var destroyed []string + var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { err := r.p.Destroy(req.GetPipelines()[i]) if err != nil { -- cgit v1.2.3 From e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 13:44:41 +0300 Subject: Remove attempts from the proto, and general jobs options Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index aeba499b..717ce33b 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -143,7 +143,6 @@ func (r *rpc) from(j *jobsv1beta.Job) *job.Job { Priority: j.GetOptions().GetPriority(), Pipeline: j.GetOptions().GetPipeline(), Delay: j.GetOptions().GetDelay(), - Attempts: j.GetOptions().GetAttempts(), RetryDelay: j.GetOptions().GetRetryDelay(), Timeout: j.GetOptions().GetTimeout(), }, -- cgit v1.2.3 From 4169e8374f581ba2213f8cd1833cc6b9b84438e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 12 Aug 2021 11:28:45 +0300 Subject: Fix various bugs in the SQS. Implement SQS tests for the jobs_ok.php/jobs_err.php workers. Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 14 -------------- 1 file changed, 14 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 717ce33b..af1e12c0 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -13,20 +13,6 @@ type rpc struct { p *Plugin } -/* -List of the RPC methods: -1. Release - single job push -2. PushBatch - push job batch - -3. Reset - managed by the Resetter plugin - -4. Pause - pauses set of pipelines -5. Resume - resumes set of pipelines - -6. Workers - managed by the Informer plugin. -7. Stat - jobs statistic -*/ - func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { const op = errors.Op("jobs_rpc_push") -- cgit v1.2.3 From ecbfc5c5265a9895f4e371ce4388f64df8714e63 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 12 Aug 2021 13:25:36 +0300 Subject: Remove unneeded options, complete tests for the ephemeral, update proto Signed-off-by: Valery Piashchynski --- plugins/jobs/rpc.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'plugins/jobs/rpc.go') diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index af1e12c0..7f9859fb 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -126,11 +126,9 @@ func (r *rpc) from(j *jobsv1beta.Job) *job.Job { Ident: j.GetId(), Payload: j.GetPayload(), Options: &job.Options{ - Priority: j.GetOptions().GetPriority(), - Pipeline: j.GetOptions().GetPipeline(), - Delay: j.GetOptions().GetDelay(), - RetryDelay: j.GetOptions().GetRetryDelay(), - Timeout: j.GetOptions().GetTimeout(), + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), }, } -- cgit v1.2.3