diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 /proto | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'proto')
-rw-r--r-- | proto/jobs/v1beta/jobs.pb.go | 476 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 36 | ||||
-rw-r--r-- | proto/kv/v1beta/kv.pb.go | 9 | ||||
-rw-r--r-- | proto/websockets/v1beta/websockets.pb.go | 9 |
4 files changed, 509 insertions, 21 deletions
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go new file mode 100644 index 00000000..9d8427be --- /dev/null +++ b/proto/jobs/v1beta/jobs.pb.go @@ -0,0 +1,476 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: jobs.proto + +package jobsv1beta + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// single job request +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetJob() *Job { + if x != nil { + return x.Job + } + return nil +} + +// batch jobs request +type BatchRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"` +} + +func (x *BatchRequest) Reset() { + *x = BatchRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchRequest) ProtoMessage() {} + +func (x *BatchRequest) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchRequest.ProtoReflect.Descriptor instead. +func (*BatchRequest) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchRequest) GetJobs() []*Job { + if x != nil { + return x.Jobs + } + return nil +} + +// RPC response +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Options *Options `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{3} +} + +func (x *Job) GetJob() string { + if x != nil { + return x.Job + } + return "" +} + +func (x *Job) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + +func (x *Job) GetOptions() *Options { + if x != nil { + return x.Options + } + return nil +} + +type Options struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Priority uint64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Pipeline string `protobuf:"bytes,3,opt,name=pipeline,proto3" json:"pipeline,omitempty"` + Delay uint64 `protobuf:"varint,4,opt,name=delay,proto3" json:"delay,omitempty"` + Attempts uint64 `protobuf:"varint,5,opt,name=attempts,proto3" json:"attempts,omitempty"` + RetryDelay uint64 `protobuf:"varint,6,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"` + Timeout uint64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Options) Reset() { + *x = Options{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Options) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Options) ProtoMessage() {} + +func (x *Options) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Options.ProtoReflect.Descriptor instead. +func (*Options) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{4} +} + +func (x *Options) GetPriority() uint64 { + if x != nil { + return x.Priority + } + return 0 +} + +func (x *Options) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Options) GetPipeline() string { + if x != nil { + return x.Pipeline + } + return "" +} + +func (x *Options) GetDelay() uint64 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *Options) GetAttempts() uint64 { + if x != nil { + return x.Attempts + } + return 0 +} + +func (x *Options) GetRetryDelay() uint64 { + if x != nil { + return x.RetryDelay + } + return 0 +} + +func (x *Options) GetTimeout() uint64 { + if x != nil { + return x.Timeout + } + return 0 +} + +var File_jobs_proto protoreflect.FileDescriptor + +var file_jobs_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f, + 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x2d, 0x0a, 0x07, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x34, 0x0a, 0x0c, 0x42, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, + 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x1a, + 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x61, 0x0a, 0x03, 0x4a, 0x6f, + 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6a, 0x6f, 0x62, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x2e, 0x0a, + 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, + 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xbe, 0x01, + 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x70, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, + 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, + 0x61, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, + 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0f, + 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_jobs_proto_rawDescOnce sync.Once + file_jobs_proto_rawDescData = file_jobs_proto_rawDesc +) + +func file_jobs_proto_rawDescGZIP() []byte { + file_jobs_proto_rawDescOnce.Do(func() { + file_jobs_proto_rawDescData = protoimpl.X.CompressGZIP(file_jobs_proto_rawDescData) + }) + return file_jobs_proto_rawDescData +} + +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_jobs_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: jobs.v1beta.Request + (*BatchRequest)(nil), // 1: jobs.v1beta.BatchRequest + (*Response)(nil), // 2: jobs.v1beta.Response + (*Job)(nil), // 3: jobs.v1beta.Job + (*Options)(nil), // 4: jobs.v1beta.Options +} +var file_jobs_proto_depIdxs = []int32{ + 3, // 0: jobs.v1beta.Request.job:type_name -> jobs.v1beta.Job + 3, // 1: jobs.v1beta.BatchRequest.jobs:type_name -> jobs.v1beta.Job + 4, // 2: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_jobs_proto_init() } +func file_jobs_proto_init() { + if File_jobs_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Options); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_jobs_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_jobs_proto_goTypes, + DependencyIndexes: file_jobs_proto_depIdxs, + MessageInfos: file_jobs_proto_msgTypes, + }.Build() + File_jobs_proto = out.File + file_jobs_proto_rawDesc = nil + file_jobs_proto_goTypes = nil + file_jobs_proto_depIdxs = nil +} diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 46434fa8..13fd5595 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -1,22 +1,36 @@ syntax = "proto3"; -package kv.v1beta; +package jobs.v1beta; option go_package = "./;jobsv1beta"; +// single job request message Request { - // could be an enum in the future - string storage = 1; - repeated Item items = 2; + Job job = 1; } -message Item { - string key = 1; - bytes value = 2; - // RFC 3339 - string timeout = 3; +// batch jobs request +message BatchRequest { + repeated Job jobs = 1; } -// KV response for the KV RPC methods +// RPC response message Response { - repeated Item items = 1; + string id = 1; } + +message Job { + string job = 1; + string payload = 2; + Options options = 3; +} + +message Options { + uint64 priority = 1; + string id = 2; + string pipeline = 3; + uint64 delay = 4; + uint64 attempts = 5; + uint64 retry_delay = 6; + uint64 timeout = 7; +} + diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 75578bff..19621735 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: kv.proto package kvv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index a2868118..188dcf08 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -1,17 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 +// protoc-gen-go v1.27.1 +// protoc v3.17.3 // source: websockets.proto package websocketsv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( |