summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
committerValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
commitd4c92e48bada7593b6fbec612a742c599de6e736 (patch)
tree53b6fb81987953b71a77ae094e579a0a7daa407c /plugins/jobs/rpc.go
parent9dc98d43b0c0de3e1e1bd8fdc97c122c7c7c594f (diff)
- Jobs plugin initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go151
1 files changed, 151 insertions, 0 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..cc1ecd99
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,151 @@
+package jobs
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/util"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []*util.State `json:"workers"`
+}
+
+// PipelineList contains list of pipeline stats.
+type PipelineList struct {
+ // Pipelines is list of pipeline stats.
+ Pipelines []*Stat `json:"pipelines"`
+}
+
+// Push job to the testQueue.
+func (rpc *rpcServer) Push(j *Job, id *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *id, err = rpc.svc.Push(j)
+ return
+}
+
+// Push job to the testQueue.
+func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *ok = true
+ go rpc.svc.Push(j)
+
+ return
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, w *string) error {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *w = "OK"
+ return rpc.svc.rr.Reset()
+}
+
+// Destroy job pipelines for a given pipeline.
+func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ pipe := rpc.svc.cfg.pipelines.Get(pipeline)
+ if pipe == nil {
+ return fmt.Errorf("undefined pipeline `%s`", pipeline)
+ }
+
+ if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
+ return err
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Resume job pipelines for a given pipeline.
+func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ pipe := rpc.svc.cfg.pipelines.Get(pipeline)
+ if pipe == nil {
+ return fmt.Errorf("undefined pipeline `%s`", pipeline)
+ }
+
+ if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
+ return err
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Destroy job pipelines for a given pipeline.
+func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) {
+ if rpc.svc == nil || rpc.svc.rr == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ for _, pipe := range rpc.svc.cfg.pipelines {
+ if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
+ return err
+ }
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Resume job pipelines for a given pipeline.
+func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ for _, pipe := range rpc.svc.cfg.pipelines {
+ if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
+ return err
+ }
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Workers returns list of pipelines workers and their stats.
+func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ w.Workers, err = util.ServerState(rpc.svc.rr)
+ return err
+}
+
+// Stat returns list of pipelines workers and their stats.
+func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *l = PipelineList{}
+ for _, p := range rpc.svc.cfg.pipelines {
+ stat, err := rpc.svc.Stat(p)
+ if err != nil {
+ return err
+ }
+
+ l.Pipelines = append(l.Pipelines, stat)
+ }
+
+ return err
+}