diff options
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go (renamed from pkg/worker_watcher/container/vec.go) | 8 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 103 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 59 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/plugin.go | 29 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 28 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.pb.go | 275 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 17 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-test.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 5 | ||||
-rw-r--r-- | tests/psr-worker-bench.php | 75 |
14 files changed, 375 insertions, 232 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/channel/vec.go index 24b5fa6d..eafbfb07 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -1,4 +1,4 @@ -package container +package channel import ( "context" @@ -22,11 +22,13 @@ func NewVector(initialNumOfWorkers uint64) *Vec { return vec } -func (v *Vec) Enqueue(w worker.BaseProcess) { +func (v *Vec) Push(w worker.BaseProcess) { v.workers <- w } -func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { +func (v *Vec) Remove(_ int64) {} + +func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..a792d7c1 --- /dev/null +++ b/pkg/worker_watcher/container/queue/queue.go @@ -0,0 +1,103 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + recvCh chan worker.BaseProcess + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + // w/o buffering + recvCh: make(chan worker.BaseProcess), + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Remove(_ int64) {} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b2d61d48..6e343fff 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -8,45 +8,51 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" ) // Vector interface represents vector container type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Push used to put worker to the vector + Push(worker.BaseProcess) + // Pop used to get worker from the vector + Pop(ctx context.Context) (worker.BaseProcess, error) + // Remove worker with provided pid + Remove(pid int64) // TODO replace // Destroy used to stop releasing the workers Destroy() } +type workerWatcher struct { + sync.RWMutex + container Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + + workers []worker.BaseProcess + + allocator worker.Allocator + events events.Handler +} + // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ - container: container.NewVector(numWorkers), + container: channel.NewVector(numWorkers), numWorkers: numWorkers, - workers: make([]worker.BaseProcess, 0, numWorkers), - allocator: allocator, - events: events, + + workers: make([]worker.BaseProcess, 0, numWorkers), + + allocator: allocator, + events: events, } return ww } -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control the Destroy stage (that all workers are in the container) - numWorkers uint64 - workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler -} - func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.container.Enqueue(workers[i]) + ww.container.Push(workers[i]) // add worker to watch slice ww.workers = append(ww.workers, workers[i]) @@ -62,7 +68,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation - w, err := ww.container.Dequeue(ctx) + w, err := ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) } @@ -78,11 +84,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { // ========================================================= // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container + _ = w.Kill() + // no free workers in the container or worker not in the ReadyState (TTL-ed) // try to continuously get free one for { - w, err = ww.container.Dequeue(ctx) + w, err = ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) @@ -98,7 +104,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { case worker.StateReady: return w, nil case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work + ww.container.Push(w) // put it back, let worker finish the work continue case // all the possible wrong states @@ -162,7 +168,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { func (ww *workerWatcher) Push(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: - ww.container.Enqueue(w) + ww.container.Push(w) default: _ = w.Kill() } @@ -232,6 +238,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } + w.State().Set(worker.StateStopped) ww.Remove(w) err = ww.Allocate() if err != nil { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 20dcef2a..6def138e 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -141,7 +141,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con delayCache: make(map[string]struct{}, 100), } - // if no global section + // only global section if !cfg.Has(pluginName) { return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go new file mode 100644 index 00000000..d034d65c --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -0,0 +1 @@ +package beanstalk diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go new file mode 100644 index 00000000..d034d65c --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -0,0 +1 @@ +package beanstalk diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go new file mode 100644 index 00000000..2fea1c31 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/plugin.go @@ -0,0 +1,29 @@ +package beanstalk + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return nil, nil +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return nil, nil +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index f71c2718..98e7ebf8 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -432,6 +432,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { } // if pipeline initialized to be consumed, call Run on it + // but likely for the dynamic pipelines it should be started manually if _, ok := p.consume[pipeline.Name()]; ok { err = initializedDriver.Run(pipeline) if err != nil { @@ -440,6 +441,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { } } + // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) return nil diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index a2bd9c6d..4333c587 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) @@ -65,7 +66,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -76,7 +77,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error { +func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { pipelines := make([]string, len(req.GetPipelines())) for i := 0; i < len(pipelines); i++ { @@ -87,11 +88,32 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) er return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { resp.Pipelines = r.p.List() return nil } +// Declare pipeline used to dynamically declare any type of the pipeline +// Mandatory fields: +// 1. Driver +// 2. Pipeline name +// 3. Options related to the particular pipeline +func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("rcp_declare_pipeline") + pipe := &pipeline.Pipeline{} + + for i := range req.GetPipeline() { + (*pipe)[i] = req.GetPipeline()[i] + } + + err := r.p.Declare(pipe) + if err != nil { + return errors.E(op, err) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index b445ca3f..529cd972 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -116,8 +116,8 @@ func (x *PushBatchRequest) GetJobs() []*Job { return nil } -// request to pause/resume -type MaintenanceRequest struct { +// request to pause/resume/list +type Maintenance struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -125,8 +125,8 @@ type MaintenanceRequest struct { Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"` } -func (x *MaintenanceRequest) Reset() { - *x = MaintenanceRequest{} +func (x *Maintenance) Reset() { + *x = Maintenance{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -134,13 +134,13 @@ func (x *MaintenanceRequest) Reset() { } } -func (x *MaintenanceRequest) String() string { +func (x *Maintenance) String() string { return protoimpl.X.MessageStringOf(x) } -func (*MaintenanceRequest) ProtoMessage() {} +func (*Maintenance) ProtoMessage() {} -func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message { +func (x *Maintenance) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -152,12 +152,12 @@ func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use MaintenanceRequest.ProtoReflect.Descriptor instead. -func (*MaintenanceRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Maintenance.ProtoReflect.Descriptor instead. +func (*Maintenance) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{2} } -func (x *MaintenanceRequest) GetPipelines() []string { +func (x *Maintenance) GetPipelines() []string { if x != nil { return x.Pipelines } @@ -165,7 +165,7 @@ func (x *MaintenanceRequest) GetPipelines() []string { } // some endpoints receives nothing -// all endpoints returns nothing +// all endpoints returns nothing, except error type Empty struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -204,16 +204,16 @@ func (*Empty) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{3} } -type List struct { +type DeclareRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"` + Pipeline map[string]string `protobuf:"bytes,1,rep,name=pipeline,proto3" json:"pipeline,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (x *List) Reset() { - *x = List{} +func (x *DeclareRequest) Reset() { + *x = DeclareRequest{} if protoimpl.UnsafeEnabled { mi := &file_jobs_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -221,13 +221,13 @@ func (x *List) Reset() { } } -func (x *List) String() string { +func (x *DeclareRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*List) ProtoMessage() {} +func (*DeclareRequest) ProtoMessage() {} -func (x *List) ProtoReflect() protoreflect.Message { +func (x *DeclareRequest) ProtoReflect() protoreflect.Message { mi := &file_jobs_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -239,14 +239,14 @@ func (x *List) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use List.ProtoReflect.Descriptor instead. -func (*List) Descriptor() ([]byte, []int) { +// Deprecated: Use DeclareRequest.ProtoReflect.Descriptor instead. +func (*DeclareRequest) Descriptor() ([]byte, []int) { return file_jobs_proto_rawDescGZIP(), []int{4} } -func (x *List) GetPipelines() []string { +func (x *DeclareRequest) GetPipeline() map[string]string { if x != nil { - return x.Pipelines + return x.Pipeline } return nil } @@ -330,53 +330,6 @@ func (x *Job) GetOptions() *Options { return nil } -type HeaderValue struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"` -} - -func (x *HeaderValue) Reset() { - *x = HeaderValue{} - if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *HeaderValue) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HeaderValue) ProtoMessage() {} - -func (x *HeaderValue) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead. -func (*HeaderValue) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{6} -} - -func (x *HeaderValue) GetValue() []string { - if x != nil { - return x.Value - } - return nil -} - type Options struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -393,7 +346,7 @@ type Options struct { func (x *Options) Reset() { *x = Options{} if protoimpl.UnsafeEnabled { - mi := &file_jobs_proto_msgTypes[7] + mi := &file_jobs_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -406,7 +359,7 @@ func (x *Options) String() string { func (*Options) ProtoMessage() {} func (x *Options) ProtoReflect() protoreflect.Message { - mi := &file_jobs_proto_msgTypes[7] + mi := &file_jobs_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -419,7 +372,7 @@ func (x *Options) ProtoReflect() protoreflect.Message { // Deprecated: Use Options.ProtoReflect.Descriptor instead. func (*Options) Descriptor() ([]byte, []int) { - return file_jobs_proto_rawDescGZIP(), []int{7} + return file_jobs_proto_rawDescGZIP(), []int{6} } func (x *Options) GetPriority() int64 { @@ -464,6 +417,53 @@ func (x *Options) GetTimeout() int64 { return 0 } +type HeaderValue struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"` +} + +func (x *HeaderValue) Reset() { + *x = HeaderValue{} + if protoimpl.UnsafeEnabled { + mi := &file_jobs_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeaderValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderValue) ProtoMessage() {} + +func (x *HeaderValue) ProtoReflect() protoreflect.Message { + mi := &file_jobs_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead. +func (*HeaderValue) Descriptor() ([]byte, []int) { + return file_jobs_proto_rawDescGZIP(), []int{7} +} + +func (x *HeaderValue) GetValue() []string { + if x != nil { + return x.Value + } + return nil +} + var File_jobs_proto protoreflect.FileDescriptor var file_jobs_proto_rawDesc = []byte{ @@ -475,44 +475,51 @@ var file_jobs_proto_rawDesc = []byte{ 0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, - 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x32, 0x0a, 0x12, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65, - 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, - 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x22, 0x24, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70, - 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, - 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, - 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, - 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, - 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, - 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, 0x0a, 0x0b, - 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, - 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, - 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, - 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61, - 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x61, - 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79, - 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, - 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, - 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x2b, 0x0a, 0x0b, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65, + 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a, + 0x0e, 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x45, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50, + 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69, + 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, + 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, + 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, + 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, + 0x6c, 0x61, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, + 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, + 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, + 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, + 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -527,29 +534,31 @@ func file_jobs_proto_rawDescGZIP() []byte { return file_jobs_proto_rawDescData } -var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_jobs_proto_goTypes = []interface{}{ - (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest - (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest - (*MaintenanceRequest)(nil), // 2: jobs.v1beta.MaintenanceRequest - (*Empty)(nil), // 3: jobs.v1beta.Empty - (*List)(nil), // 4: jobs.v1beta.List - (*Job)(nil), // 5: jobs.v1beta.Job - (*HeaderValue)(nil), // 6: jobs.v1beta.HeaderValue - (*Options)(nil), // 7: jobs.v1beta.Options - nil, // 8: jobs.v1beta.Job.HeadersEntry + (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest + (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest + (*Maintenance)(nil), // 2: jobs.v1beta.Maintenance + (*Empty)(nil), // 3: jobs.v1beta.Empty + (*DeclareRequest)(nil), // 4: jobs.v1beta.DeclareRequest + (*Job)(nil), // 5: jobs.v1beta.Job + (*Options)(nil), // 6: jobs.v1beta.Options + (*HeaderValue)(nil), // 7: jobs.v1beta.HeaderValue + nil, // 8: jobs.v1beta.DeclareRequest.PipelineEntry + nil, // 9: jobs.v1beta.Job.HeadersEntry } var file_jobs_proto_depIdxs = []int32{ 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job - 8, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry - 7, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options - 6, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 8, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry + 9, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry + 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options + 7, // 5: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_jobs_proto_init() } @@ -583,7 +592,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MaintenanceRequest); i { + switch v := v.(*Maintenance); i { case 0: return &v.state case 1: @@ -607,7 +616,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*List); i { + switch v := v.(*DeclareRequest); i { case 0: return &v.state case 1: @@ -631,7 +640,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HeaderValue); i { + switch v := v.(*Options); i { case 0: return &v.state case 1: @@ -643,7 +652,7 @@ func file_jobs_proto_init() { } } file_jobs_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Options); i { + switch v := v.(*HeaderValue); i { case 0: return &v.state case 1: @@ -661,7 +670,7 @@ func file_jobs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_jobs_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 10, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 1bcddf4f..7a2bcd13 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -13,17 +13,17 @@ message PushBatchRequest { repeated Job jobs = 1; } -// request to pause/resume -message MaintenanceRequest { +// request to pause/resume/list +message Maintenance { repeated string pipelines = 1; } // some endpoints receives nothing -// all endpoints returns nothing +// all endpoints returns nothing, except error message Empty {} -message List { - repeated string pipelines = 1; +message DeclareRequest { + map<string, string> pipeline = 1; } message Job { @@ -34,10 +34,6 @@ message Job { Options options = 4; } -message HeaderValue { - repeated string value = 1; -} - message Options { int64 priority = 1; string pipeline = 2; @@ -47,3 +43,6 @@ message Options { int64 timeout = 6; } +message HeaderValue { + repeated string value = 1; +} diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml index ee72c2b7..6e2733dd 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml @@ -36,7 +36,7 @@ jobs: pipeline_size: 100000 # worker pool configuration pool: - num_workers: 10 + num_workers: 20 max_jobs: 0 allocate_timeout: 60s destroy_timeout: 60s diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index 9a13435b..59f55f3d 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -123,7 +123,6 @@ func TestJobsInit(t *testing.T) { mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -337,7 +336,7 @@ func ephemeralPause(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)} + pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)} pipe.GetPipelines()[0] = "test-local" er := &jobsv1beta.Empty{} @@ -350,7 +349,7 @@ func ephemeralResume(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)} + pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)} pipe.GetPipelines()[0] = "test-local" er := &jobsv1beta.Empty{} diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index b4a028d4..80fc435c 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -1,59 +1,28 @@ <?php - -declare(strict_types=1); - +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; use Spiral\RoadRunner; -use Nyholm\Psr7\Factory; ini_set('display_errors', 'stderr'); -include "vendor/autoload.php"; - -$env = \Spiral\RoadRunner\Environment::fromGlobals(); - -if ($env->getMode() === 'http') { - $worker = new RoadRunner\Http\PSR7Worker( - RoadRunner\Worker::create(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory(), - new Factory\Psr17Factory() - ); - - while ($req = $worker->waitRequest()) { - try { - $rsp = new \Nyholm\Psr7\Response(); - $rsp->getBody()->write("hello world"); - $worker->respond($rsp); - } catch (\Throwable $e) { - $worker->getWorker()->error((string)$e); - } +require __DIR__ . "/vendor/autoload.php"; + +$worker = RoadRunner\Worker::create(); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); } -} else { - /** - * @param string $dir - * @return array<string> - */ - $getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } - }; - - $factory = \Temporal\WorkerFactory::create(); - - $worker = $factory->newWorker('default'); - - // register all workflows - foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { - $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); - } - - // register all activity - foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { - $class = 'Temporal\\Tests\\Activity\\' . $name; - $worker->registerActivityImplementations(new $class); - } - - $factory->run(); } |