summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
committerValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
commit2c78e93222cc9d3b88456175348e42f7f40c449b (patch)
treebe4fc671db33ceb8700019a5ede900c8d900d7c0 /plugins/jobs/rpc.go
parent207739f7346c98e16087547bc510e1f909671260 (diff)
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from spinning in the for loop and receiving nil Items until the Queue will be filled. Add num_pollers option to the configuration to specify number of pollers from the queue. Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral. Remove map and use sync.Map in the ephemeral broker. Add protobuf schema. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go95
1 files changed, 92 insertions, 3 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index e77cda59..c6bd1645 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,8 +1,12 @@
package jobs
import (
+ "sync"
+
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
type rpc struct {
@@ -10,11 +14,96 @@ type rpc struct {
p *Plugin
}
-func (r *rpc) Push(j *structs.Job, idRet *string) error {
+var jobsPool = &sync.Pool{
+ New: func() interface{} {
+ return &structs.Job{
+ Options: &structs.Options{},
+ }
+ },
+}
+
+func pubJob(j *structs.Job) {
+ // clear
+ j.Job = ""
+ j.Payload = ""
+ j.Options = &structs.Options{}
+ jobsPool.Put(j)
+}
+
+func getJob() *structs.Job {
+ return jobsPool.Get().(*structs.Job)
+}
+
+/*
+List of the RPC methods:
+1. Push - single job push
+2. PushBatch - push job batch
+
+3. Reset - managed by the Resetter plugin
+
+4. Stop - stop pipeline processing
+5. StopAll - stop all pipelines processing
+6. Resume - resume pipeline processing
+7. ResumeAll - resume stopped pipelines
+
+8. Workers - managed by the Informer plugin.
+9. Stat - jobs statistic
+*/
+
+func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ // convert transport entity into domain
+ // how we can do this quickly
+ jb := getJob()
+ defer pubJob(jb)
+
+ jb = &structs.Job{
+ Job: j.GetJob().Job,
+ Payload: j.GetJob().Payload,
+ Options: &structs.Options{
+ Priority: &j.GetJob().Options.Priority,
+ ID: &j.GetJob().Options.Id,
+ Pipeline: j.GetJob().Options.Pipeline,
+ Delay: j.GetJob().Options.Delay,
+ Attempts: j.GetJob().Options.Attempts,
+ RetryDelay: j.GetJob().Options.RetryDelay,
+ Timeout: j.GetJob().Options.Timeout,
+ },
+ }
+ id, err := r.p.Push(jb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ resp.Id = *id
+
+ return nil
+}
+
+func (r *rpc) PushBatch(j *structs.Job, idRet *string) error {
+ const op = errors.Op("jobs_rpc_push")
id, err := r.p.Push(j)
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
- *idRet = id
+
+ *idRet = *id
+ return nil
+}
+
+func (r *rpc) Stop(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) StopAll(_ bool, w *string) error {
+ return nil
+}
+
+func (r *rpc) Resume(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) ResumeAll(_ bool, w *string) error {
return nil
}