summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-12 12:45:53 +0300
committerValery Piashchynski <[email protected]>2021-07-12 12:45:53 +0300
commitaa1437d24ac215bec7fe053b06fa4773c9b1b1ad (patch)
tree7a6868867877f34ac5e2d490bfb589b3dce02917 /plugins/jobs/rpc.go
parent87971c4d310fe3d353197fc96b9b6f9106f01e57 (diff)
Update JOBS interface, remove List() method, implemented on the root RPC
level. AMQP consumer replace sync.Map with atomic.Value, because we associate only 1 pipeline with a driver. So, we can store pipeline in the atomic.Value. Implement events handler, add job events. Use job events to push information to the logger. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go13
1 files changed, 9 insertions, 4 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 6718b99a..0bb94fa4 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -26,7 +26,7 @@ List of the RPC methods:
7. Stat - jobs statistic
*/
-func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error {
+func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
const op = errors.Op("jobs_rpc_push")
// convert transport entity into domain
@@ -44,7 +44,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error
return nil
}
-func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error {
+func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
const op = errors.Op("jobs_rpc_push")
l := len(j.GetJobs())
@@ -65,7 +65,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyRespo
return nil
}
-func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error {
+func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
pipelines := make([]string, len(req.GetPipelines()))
for i := 0; i < len(pipelines); i++ {
@@ -76,7 +76,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyRespo
return nil
}
-func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error {
+func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
pipelines := make([]string, len(req.GetPipelines()))
for i := 0; i < len(pipelines); i++ {
@@ -87,6 +87,11 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResp
return nil
}
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error {
+ resp.Pipelines = r.p.List()
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *structs.Job {
headers := map[string][]string{}