summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/worker_watcher/container/channel/vec.go (renamed from pkg/worker_watcher/container/vec.go)8
-rw-r--r--pkg/worker_watcher/container/queue/queue.go103
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go59
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go1
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go1
-rw-r--r--plugins/jobs/drivers/beanstalk/plugin.go29
-rw-r--r--plugins/jobs/plugin.go2
-rw-r--r--plugins/jobs/rpc.go28
-rw-r--r--proto/jobs/v1beta/jobs.pb.go275
-rw-r--r--proto/jobs/v1beta/jobs.proto17
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml2
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go5
-rw-r--r--tests/psr-worker-bench.php75
14 files changed, 375 insertions, 232 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 24b5fa6d..eafbfb07 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -1,4 +1,4 @@
-package container
+package channel
import (
"context"
@@ -22,11 +22,13 @@ func NewVector(initialNumOfWorkers uint64) *Vec {
return vec
}
-func (v *Vec) Enqueue(w worker.BaseProcess) {
+func (v *Vec) Push(w worker.BaseProcess) {
v.workers <- w
}
-func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
new file mode 100644
index 00000000..a792d7c1
--- /dev/null
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -0,0 +1,103 @@
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+const (
+ initialSize = 1
+ maxInitialSize = 8
+ maxInternalSliceSize = 10
+)
+
+type Node struct {
+ w []worker.BaseProcess
+ // LL
+ n *Node
+}
+
+type Queue struct {
+ mu sync.Mutex
+
+ head *Node
+ tail *Node
+
+ recvCh chan worker.BaseProcess
+ curr uint64
+ len uint64
+
+ sliceSize uint64
+}
+
+func NewQueue() *Queue {
+ q := &Queue{
+ mu: sync.Mutex{},
+ // w/o buffering
+ recvCh: make(chan worker.BaseProcess),
+ head: nil,
+ tail: nil,
+ curr: 0,
+ len: 0,
+ sliceSize: 0,
+ }
+
+ return q
+}
+
+func (q *Queue) Push(w worker.BaseProcess) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ h := newNode(initialSize)
+ q.head = h
+ q.tail = h
+ q.sliceSize = maxInitialSize
+ } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
+ n := newNode(maxInternalSliceSize)
+ q.tail.n = n
+ q.tail = n
+ q.sliceSize = maxInternalSliceSize
+ }
+
+ q.tail.w = append(q.tail.w, w)
+
+ atomic.AddUint64(&q.len, 1)
+
+ q.mu.Unlock()
+}
+
+func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ return nil, nil
+ }
+
+ w := q.head.w[q.curr]
+ q.head.w[q.curr] = nil
+ atomic.AddUint64(&q.len, ^uint64(0))
+ atomic.AddUint64(&q.curr, 1)
+
+ if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
+ n := q.head.n
+ q.head.n = nil
+ q.head = n
+ q.curr = 0
+ }
+
+ q.mu.Unlock()
+
+ return w, nil
+}
+
+func (q *Queue) Remove(_ int64) {}
+
+func (q *Queue) Destroy() {}
+
+func newNode(capacity int) *Node {
+ return &Node{w: make([]worker.BaseProcess, 0, capacity)}
+}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b2d61d48..6e343fff 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -8,45 +8,51 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
)
// Vector interface represents vector container
type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64) // TODO replace
// Destroy used to stop releasing the workers
Destroy()
}
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ events events.Handler
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
- container: container.NewVector(numWorkers),
+ container: channel.NewVector(numWorkers),
numWorkers: numWorkers,
- workers: make([]worker.BaseProcess, 0, numWorkers),
- allocator: allocator,
- events: events,
+
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
}
return ww
}
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control the Destroy stage (that all workers are in the container)
- numWorkers uint64
- workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
-}
-
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.container.Enqueue(workers[i])
+ ww.container.Push(workers[i])
// add worker to watch slice
ww.workers = append(ww.workers, workers[i])
@@ -62,7 +68,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
- w, err := ww.container.Dequeue(ctx)
+ w, err := ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
}
@@ -78,11 +84,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
// =========================================================
// SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
// try to continuously get free one
for {
- w, err = ww.container.Dequeue(ctx)
+ w, err = ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
@@ -98,7 +104,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
case worker.StateReady:
return w, nil
case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
+ ww.container.Push(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -162,7 +168,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
func (ww *workerWatcher) Push(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
- ww.container.Enqueue(w)
+ ww.container.Push(w)
default:
_ = w.Kill()
}
@@ -232,6 +238,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
return
}
+ w.State().Set(worker.StateStopped)
ww.Remove(w)
err = ww.Allocate()
if err != nil {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 20dcef2a..6def138e 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -141,7 +141,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
delayCache: make(map[string]struct{}, 100),
}
- // if no global section
+ // only global section
if !cfg.Has(pluginName) {
return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
new file mode 100644
index 00000000..d034d65c
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -0,0 +1 @@
+package beanstalk
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
new file mode 100644
index 00000000..d034d65c
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -0,0 +1 @@
+package beanstalk
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go
new file mode 100644
index 00000000..2fea1c31
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/plugin.go
@@ -0,0 +1,29 @@
+package beanstalk
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return nil, nil
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return nil, nil
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index f71c2718..98e7ebf8 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -432,6 +432,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
}
// if pipeline initialized to be consumed, call Run on it
+ // but likely for the dynamic pipelines it should be started manually
if _, ok := p.consume[pipeline.Name()]; ok {
err = initializedDriver.Run(pipeline)
if err != nil {
@@ -440,6 +441,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
}
}
+ // save the pipeline
p.pipelines.Store(pipeline.Name(), pipeline)
return nil
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index a2bd9c6d..4333c587 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -3,6 +3,7 @@ package jobs
import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
@@ -65,7 +66,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
return nil
}
-func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
+func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
pipelines := make([]string, len(req.GetPipelines()))
for i := 0; i < len(pipelines); i++ {
@@ -76,7 +77,7 @@ func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) err
return nil
}
-func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
+func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
pipelines := make([]string, len(req.GetPipelines()))
for i := 0; i < len(pipelines); i++ {
@@ -87,11 +88,32 @@ func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) er
return nil
}
-func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error {
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error {
resp.Pipelines = r.p.List()
return nil
}
+// Declare pipeline used to dynamically declare any type of the pipeline
+// Mandatory fields:
+// 1. Driver
+// 2. Pipeline name
+// 3. Options related to the particular pipeline
+func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("rcp_declare_pipeline")
+ pipe := &pipeline.Pipeline{}
+
+ for i := range req.GetPipeline() {
+ (*pipe)[i] = req.GetPipeline()[i]
+ }
+
+ err := r.p.Declare(pipe)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index b445ca3f..529cd972 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -116,8 +116,8 @@ func (x *PushBatchRequest) GetJobs() []*Job {
return nil
}
-// request to pause/resume
-type MaintenanceRequest struct {
+// request to pause/resume/list
+type Maintenance struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -125,8 +125,8 @@ type MaintenanceRequest struct {
Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"`
}
-func (x *MaintenanceRequest) Reset() {
- *x = MaintenanceRequest{}
+func (x *Maintenance) Reset() {
+ *x = Maintenance{}
if protoimpl.UnsafeEnabled {
mi := &file_jobs_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -134,13 +134,13 @@ func (x *MaintenanceRequest) Reset() {
}
}
-func (x *MaintenanceRequest) String() string {
+func (x *Maintenance) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*MaintenanceRequest) ProtoMessage() {}
+func (*Maintenance) ProtoMessage() {}
-func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message {
+func (x *Maintenance) ProtoReflect() protoreflect.Message {
mi := &file_jobs_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -152,12 +152,12 @@ func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use MaintenanceRequest.ProtoReflect.Descriptor instead.
-func (*MaintenanceRequest) Descriptor() ([]byte, []int) {
+// Deprecated: Use Maintenance.ProtoReflect.Descriptor instead.
+func (*Maintenance) Descriptor() ([]byte, []int) {
return file_jobs_proto_rawDescGZIP(), []int{2}
}
-func (x *MaintenanceRequest) GetPipelines() []string {
+func (x *Maintenance) GetPipelines() []string {
if x != nil {
return x.Pipelines
}
@@ -165,7 +165,7 @@ func (x *MaintenanceRequest) GetPipelines() []string {
}
// some endpoints receives nothing
-// all endpoints returns nothing
+// all endpoints returns nothing, except error
type Empty struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -204,16 +204,16 @@ func (*Empty) Descriptor() ([]byte, []int) {
return file_jobs_proto_rawDescGZIP(), []int{3}
}
-type List struct {
+type DeclareRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"`
+ Pipeline map[string]string `protobuf:"bytes,1,rep,name=pipeline,proto3" json:"pipeline,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
-func (x *List) Reset() {
- *x = List{}
+func (x *DeclareRequest) Reset() {
+ *x = DeclareRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_jobs_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -221,13 +221,13 @@ func (x *List) Reset() {
}
}
-func (x *List) String() string {
+func (x *DeclareRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*List) ProtoMessage() {}
+func (*DeclareRequest) ProtoMessage() {}
-func (x *List) ProtoReflect() protoreflect.Message {
+func (x *DeclareRequest) ProtoReflect() protoreflect.Message {
mi := &file_jobs_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -239,14 +239,14 @@ func (x *List) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use List.ProtoReflect.Descriptor instead.
-func (*List) Descriptor() ([]byte, []int) {
+// Deprecated: Use DeclareRequest.ProtoReflect.Descriptor instead.
+func (*DeclareRequest) Descriptor() ([]byte, []int) {
return file_jobs_proto_rawDescGZIP(), []int{4}
}
-func (x *List) GetPipelines() []string {
+func (x *DeclareRequest) GetPipeline() map[string]string {
if x != nil {
- return x.Pipelines
+ return x.Pipeline
}
return nil
}
@@ -330,53 +330,6 @@ func (x *Job) GetOptions() *Options {
return nil
}
-type HeaderValue struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"`
-}
-
-func (x *HeaderValue) Reset() {
- *x = HeaderValue{}
- if protoimpl.UnsafeEnabled {
- mi := &file_jobs_proto_msgTypes[6]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *HeaderValue) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*HeaderValue) ProtoMessage() {}
-
-func (x *HeaderValue) ProtoReflect() protoreflect.Message {
- mi := &file_jobs_proto_msgTypes[6]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead.
-func (*HeaderValue) Descriptor() ([]byte, []int) {
- return file_jobs_proto_rawDescGZIP(), []int{6}
-}
-
-func (x *HeaderValue) GetValue() []string {
- if x != nil {
- return x.Value
- }
- return nil
-}
-
type Options struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -393,7 +346,7 @@ type Options struct {
func (x *Options) Reset() {
*x = Options{}
if protoimpl.UnsafeEnabled {
- mi := &file_jobs_proto_msgTypes[7]
+ mi := &file_jobs_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -406,7 +359,7 @@ func (x *Options) String() string {
func (*Options) ProtoMessage() {}
func (x *Options) ProtoReflect() protoreflect.Message {
- mi := &file_jobs_proto_msgTypes[7]
+ mi := &file_jobs_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -419,7 +372,7 @@ func (x *Options) ProtoReflect() protoreflect.Message {
// Deprecated: Use Options.ProtoReflect.Descriptor instead.
func (*Options) Descriptor() ([]byte, []int) {
- return file_jobs_proto_rawDescGZIP(), []int{7}
+ return file_jobs_proto_rawDescGZIP(), []int{6}
}
func (x *Options) GetPriority() int64 {
@@ -464,6 +417,53 @@ func (x *Options) GetTimeout() int64 {
return 0
}
+type HeaderValue struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *HeaderValue) Reset() {
+ *x = HeaderValue{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HeaderValue) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HeaderValue) ProtoMessage() {}
+
+func (x *HeaderValue) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[7]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead.
+func (*HeaderValue) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{7}
+}
+
+func (x *HeaderValue) GetValue() []string {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
var File_jobs_proto protoreflect.FileDescriptor
var file_jobs_proto_rawDesc = []byte{
@@ -475,44 +475,51 @@ var file_jobs_proto_rawDesc = []byte{
0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10,
0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62,
- 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x32, 0x0a, 0x12, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65,
- 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09,
- 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52,
- 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d,
- 0x70, 0x74, 0x79, 0x22, 0x24, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70,
- 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09,
- 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f,
- 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
- 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a,
- 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d,
- 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62,
- 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68,
- 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
- 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f,
- 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72,
- 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
- 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75,
- 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, 0x0a, 0x0b,
- 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76,
- 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
- 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a,
- 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
- 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70,
- 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70,
- 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x03,
- 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61,
- 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x61,
- 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79,
- 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
- 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
- 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
- 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62,
- 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x2b, 0x0a, 0x0b, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65,
+ 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e,
+ 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69,
+ 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x94, 0x01, 0x0a,
+ 0x0e, 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
+ 0x45, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28,
+ 0x0b, 0x32, 0x29, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
+ 0x44, 0x65, 0x63, 0x6c, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50,
+ 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x69,
+ 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x1a, 0x3b, 0x0a, 0x0d, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69,
+ 0x6e, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
+ 0x02, 0x38, 0x01, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a,
+ 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a,
+ 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a,
+ 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
+ 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65,
+ 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e,
+ 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65,
+ 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
+ 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
+ 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
+ 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
+ 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e,
+ 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f,
+ 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a,
+ 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65,
+ 0x6c, 0x61, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79,
+ 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01,
+ 0x28, 0x03, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b,
+ 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28,
+ 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a,
+ 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07,
+ 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x23, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65,
+ 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
+ 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d,
+ 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -527,29 +534,31 @@ func file_jobs_proto_rawDescGZIP() []byte {
return file_jobs_proto_rawDescData
}
-var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
+var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_jobs_proto_goTypes = []interface{}{
- (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
- (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
- (*MaintenanceRequest)(nil), // 2: jobs.v1beta.MaintenanceRequest
- (*Empty)(nil), // 3: jobs.v1beta.Empty
- (*List)(nil), // 4: jobs.v1beta.List
- (*Job)(nil), // 5: jobs.v1beta.Job
- (*HeaderValue)(nil), // 6: jobs.v1beta.HeaderValue
- (*Options)(nil), // 7: jobs.v1beta.Options
- nil, // 8: jobs.v1beta.Job.HeadersEntry
+ (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
+ (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
+ (*Maintenance)(nil), // 2: jobs.v1beta.Maintenance
+ (*Empty)(nil), // 3: jobs.v1beta.Empty
+ (*DeclareRequest)(nil), // 4: jobs.v1beta.DeclareRequest
+ (*Job)(nil), // 5: jobs.v1beta.Job
+ (*Options)(nil), // 6: jobs.v1beta.Options
+ (*HeaderValue)(nil), // 7: jobs.v1beta.HeaderValue
+ nil, // 8: jobs.v1beta.DeclareRequest.PipelineEntry
+ nil, // 9: jobs.v1beta.Job.HeadersEntry
}
var file_jobs_proto_depIdxs = []int32{
5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
- 8, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
- 7, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
- 6, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
- 5, // [5:5] is the sub-list for method output_type
- 5, // [5:5] is the sub-list for method input_type
- 5, // [5:5] is the sub-list for extension type_name
- 5, // [5:5] is the sub-list for extension extendee
- 0, // [0:5] is the sub-list for field type_name
+ 8, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry
+ 9, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
+ 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
+ 7, // 5: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
+ 6, // [6:6] is the sub-list for method output_type
+ 6, // [6:6] is the sub-list for method input_type
+ 6, // [6:6] is the sub-list for extension type_name
+ 6, // [6:6] is the sub-list for extension extendee
+ 0, // [0:6] is the sub-list for field type_name
}
func init() { file_jobs_proto_init() }
@@ -583,7 +592,7 @@ func file_jobs_proto_init() {
}
}
file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*MaintenanceRequest); i {
+ switch v := v.(*Maintenance); i {
case 0:
return &v.state
case 1:
@@ -607,7 +616,7 @@ func file_jobs_proto_init() {
}
}
file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*List); i {
+ switch v := v.(*DeclareRequest); i {
case 0:
return &v.state
case 1:
@@ -631,7 +640,7 @@ func file_jobs_proto_init() {
}
}
file_jobs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*HeaderValue); i {
+ switch v := v.(*Options); i {
case 0:
return &v.state
case 1:
@@ -643,7 +652,7 @@ func file_jobs_proto_init() {
}
}
file_jobs_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Options); i {
+ switch v := v.(*HeaderValue); i {
case 0:
return &v.state
case 1:
@@ -661,7 +670,7 @@ func file_jobs_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_jobs_proto_rawDesc,
NumEnums: 0,
- NumMessages: 9,
+ NumMessages: 10,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 1bcddf4f..7a2bcd13 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -13,17 +13,17 @@ message PushBatchRequest {
repeated Job jobs = 1;
}
-// request to pause/resume
-message MaintenanceRequest {
+// request to pause/resume/list
+message Maintenance {
repeated string pipelines = 1;
}
// some endpoints receives nothing
-// all endpoints returns nothing
+// all endpoints returns nothing, except error
message Empty {}
-message List {
- repeated string pipelines = 1;
+message DeclareRequest {
+ map<string, string> pipeline = 1;
}
message Job {
@@ -34,10 +34,6 @@ message Job {
Options options = 4;
}
-message HeaderValue {
- repeated string value = 1;
-}
-
message Options {
int64 priority = 1;
string pipeline = 2;
@@ -47,3 +43,6 @@ message Options {
int64 timeout = 6;
}
+message HeaderValue {
+ repeated string value = 1;
+}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
index ee72c2b7..6e2733dd 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -36,7 +36,7 @@ jobs:
pipeline_size: 100000
# worker pool configuration
pool:
- num_workers: 10
+ num_workers: 20
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 9a13435b..59f55f3d 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -123,7 +123,6 @@ func TestJobsInit(t *testing.T) {
mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -337,7 +336,7 @@ func ephemeralPause(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)}
+ pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)}
pipe.GetPipelines()[0] = "test-local"
er := &jobsv1beta.Empty{}
@@ -350,7 +349,7 @@ func ephemeralResume(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)}
+ pipe := &jobsv1beta.Maintenance{Pipelines: make([]string, 1)}
pipe.GetPipelines()[0] = "test-local"
er := &jobsv1beta.Empty{}
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index b4a028d4..80fc435c 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -1,59 +1,28 @@
<?php
-
-declare(strict_types=1);
-
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
use Spiral\RoadRunner;
-use Nyholm\Psr7\Factory;
ini_set('display_errors', 'stderr');
-include "vendor/autoload.php";
-
-$env = \Spiral\RoadRunner\Environment::fromGlobals();
-
-if ($env->getMode() === 'http') {
- $worker = new RoadRunner\Http\PSR7Worker(
- RoadRunner\Worker::create(),
- new Factory\Psr17Factory(),
- new Factory\Psr17Factory(),
- new Factory\Psr17Factory()
- );
-
- while ($req = $worker->waitRequest()) {
- try {
- $rsp = new \Nyholm\Psr7\Response();
- $rsp->getBody()->write("hello world");
- $worker->respond($rsp);
- } catch (\Throwable $e) {
- $worker->getWorker()->error((string)$e);
- }
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = RoadRunner\Worker::create();
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ $resp->getBody()->write("hello world");
+
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
}
-} else {
- /**
- * @param string $dir
- * @return array<string>
- */
- $getClasses = static function (string $dir): iterable {
- $files = glob($dir . '/*.php');
-
- foreach ($files as $file) {
- yield substr(basename($file), 0, -4);
- }
- };
-
- $factory = \Temporal\WorkerFactory::create();
-
- $worker = $factory->newWorker('default');
-
- // register all workflows
- foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) {
- $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
- }
-
- // register all activity
- foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
- $class = 'Temporal\\Tests\\Activity\\' . $name;
- $worker->registerActivityImplementations(new $class);
- }
-
- $factory->run();
}