diff options
Diffstat (limited to 'plugins/jobs/oooold/rpc.go')
-rw-r--r-- | plugins/jobs/oooold/rpc.go | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go new file mode 100644 index 00000000..42730a68 --- /dev/null +++ b/plugins/jobs/oooold/rpc.go @@ -0,0 +1,151 @@ +package oooold + +import ( + "fmt" + "github.com/spiral/roadrunner/util" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []*util.State `json:"workers"` +} + +// PipelineList contains list of pipeline stats. +type PipelineList struct { + // Pipelines is list of pipeline stats. + Pipelines []*Stat `json:"pipelines"` +} + +// Push job to the testQueue. +func (rpc *rpcServer) Push(j *Job, id *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *id, err = rpc.svc.Push(j) + return +} + +// Push job to the testQueue. +func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *ok = true + go rpc.svc.Push(j) + + return +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, w *string) error { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *w = "OK" + return rpc.svc.rr.Reset() +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) { + if rpc.svc == nil || rpc.svc.rr == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Workers returns list of pipelines workers and their stats. +func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + w.Workers, err = util.ServerState(rpc.svc.rr) + return err +} + +// Stat returns list of pipelines workers and their stats. +func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *l = PipelineList{} + for _, p := range rpc.svc.cfg.pipelines { + stat, err := rpc.svc.Stat(p) + if err != nil { + return err + } + + l.Pipelines = append(l.Pipelines, stat) + } + + return err +} |