summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-30 11:08:40 +0300
committerValery Piashchynski <[email protected]>2021-06-30 11:08:40 +0300
commit2ac3b240b118961c1a30cc18dd22d08b7fac6516 (patch)
tree25f48908286a05ea78e4049d89f88450d0541f99
parentc0f808bb8c7077e18aa197f024628b9912def58b (diff)
- Update arch diagrams
- Update ephemeral plugin Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/priority_queue/interface.go3
-rw-r--r--pkg/priority_queue/queue.go8
-rw-r--r--plugins/informer/rpc.go14
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go42
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go21
-rw-r--r--plugins/jobs/brokers/ephemeral/queue.go7
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go9
-rw-r--r--plugins/jobs/plugin.go9
-rw-r--r--plugins/jobs/pq_plugin/plugin.go34
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go2
11 files changed, 136 insertions, 15 deletions
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 5945a013..00998d78 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -1,7 +1,6 @@
package priorityqueue
type Queue interface {
- Push()
+ Push(item interface{})
Pop() interface{}
- BLPop()
}
diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go
index 79afab18..c12acbf6 100644
--- a/pkg/priority_queue/queue.go
+++ b/pkg/priority_queue/queue.go
@@ -1,15 +1,17 @@
package priorityqueue
+import "fmt"
+
type QueueImpl struct {
}
func NewPriorityQueue() *QueueImpl {
- return nil
+ return &QueueImpl{}
}
// Push the task
-func (q *QueueImpl) Push() {
-
+func (q *QueueImpl) Push(item interface{}) {
+ fmt.Println(item)
}
func (q *QueueImpl) Pop() interface{} {
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 3925ef64..f096a0af 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -38,3 +38,17 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+
+// sort.Sort
+
+func (w *WorkerList) Len() int {
+ return len(w.Workers)
+}
+
+func (w *WorkerList) Less(i, j int) bool {
+ return w.Workers[i].Pid < w.Workers[j].Pid
+}
+
+func (w *WorkerList) Swap(i, j int) {
+ w.Workers[i], w.Workers[j] = w.Workers[j], w.Workers[i]
+}
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 905f5409..95f476a6 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,20 +1,38 @@
package ephemeral
import (
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
+ jobs chan *entry
+ queues map[*pipeline.Pipeline]*queue
+ pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
- return &JobBroker{}, nil
+ jb := &JobBroker{
+ jobs: make(chan *entry, 10),
+ pq: q,
+ }
+
+ go jb.serve()
+
+ return jb, nil
}
-func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) {
- panic("implement me")
+func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
+ id := uuid.NewString()
+
+ j.jobs <- &entry{
+ id: id,
+ }
+
+ return id, nil
}
func (j *JobBroker) Stat() {
@@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) {
- panic("implement me")
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.queues[pipeline]; !ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ }
+
+ j.queues[pipeline] = newQueue()
+
+ return nil
+}
+
+func (j *JobBroker) serve() {
+ for item := range j.jobs {
+ // item should satisfy
+ j.pq.Push(item)
+ }
}
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
new file mode 100644
index 00000000..bf8796d5
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/entry.go
@@ -0,0 +1,21 @@
+package ephemeral
+
+type entry struct {
+ id string
+}
+
+func (e *entry) ID() string {
+ return e.id
+}
+
+func (e *entry) Ask() {
+ // no-op
+}
+
+func (e *entry) Nack() {
+ // no-op
+}
+
+func (e *entry) Payload() []byte {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go
new file mode 100644
index 00000000..1c6d865b
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/queue.go
@@ -0,0 +1,7 @@
+package ephemeral
+
+type queue struct{}
+
+func newQueue() *queue {
+ return &queue{}
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index d452d345..be941255 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-23T11:05:51.495Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="_SQi43bjgSX-nT5V1Ksg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7VvrU9s4EP9rMkOZIeO3nY+E8Givvebgblrum2IriVvbcmUlJPz1t7Llp5yQgpNAr1BKtHpYu/vbh1amp1+Eq2uK4vkn4uGgpyneqqePepqmDiwFfnHKWlBUS88oM+p7glYS7vxHLIhi4mzhezipDWSEBMyP60SXRBF2WY2GKCUP9WFTEtSfGqMZlgh3Lgpk6hffY/N8d/BV9txgfzZnUleI8vGCkMyRRx4qJP2yp19QQlj2KVxd4IALMBdNNu9qQ2+xN4ojtsuEr1G0ulFJiJePLLKv/8L/Ku6ZZmTLLFGwEEyL3bJ1LgVKFpGH+SpKTx8+zH2G72Lk8t4HUDzQ5iwMoKXCx6kfBBckIDSdq3sIO1MX6Amj5Duu9FiugydT6JH5EKwtMWV4VSEJvq4xMMHoGoaIXt0RMhZAG9g5gh5KtWn5oHlVYbl2kIDKrFi8FCV8ENL8GclKcsQegEs0CWVzMiMRCi5L6rAu6XLMR0JiId9vmLG1sBS0YKQufRAhXX/l8/tm3ryv9o1WYvGstRatjTpIyIK6eBubgk+G6AyzLQPFglwIW1VKcYCYv6xbYJt2xNQx8WHPBRQMw6xDwbQbGs52KqZV7UVaqQEqc9BYKROOtNI5pWhdGRbzAYkEqILn52NMPwrGVj6rQAxa9zmm4HMJMN5YV9F2UFyqO8LSOBQsG2DSrd1g2RVUcp9a+vnbMayknMHP+J+7G1g0+wUbWkdut0HAxI5ntAUBR5voltVRELAbIjY0OQioRksQ0PcVAwZHtc+qdRa2+nP2qezNPo1dw4ZyGAPVmugpcopDGehvZ/5ysFgH8ub6oO8MLNNUbc1QdXtQh47WTCr3DB05h/+EmDvnnv39+PLj+z8vO3XnWAWHbre584Fl66grd97M6fU2d661uPO9pfT2MS1Urblz+7X5c3NHE+3cQl+kUefN+Vy1ps5Su0dTaOfnuhcp1JRcYU+zAtj/MK7p2fqx4HWXYQjs+VFPP4deJV7B/6nQlIx+xrhKeZ9R6QMHxs5Q4M/EPBdki2m928MuocAvEWM4ZGjgR7h8NHyaid/pBic54QOZgFjO7xhduGxBcT4ABDJpTgJa3KTNaZPyLN4DPGVZp8M7m9uGXaYzRuna1I9mBSPF88doHRDkPTnuc8wllRTjinYbkw2D5RHFd1FwnilklKpsKNQzypgYEhg1DdLqGw9mPKaRiAkLVTXRvkKhH3DLucHBEvNVt8bBqcm/W2tb6ZdYtULPvrqJj5baLE8oLfHRaomPzTJGZ8Zn/QLGl0Pv1dvf2I9xxtMTxjUCx7t+ctQ5YziMWWmDaa66mcl9sHTLUVjfbmUXE0msh9rX336IyYI9Z1O/nVXa60i11F2dlaruy1upqqSeo6XzO2fz2qHKp8qO6V/n9dOX6VSRQtAtWC7upwfik29k8q7To/DUcbHber01cUzD3KqD3a1HNZtH4cGOR+G9VTZVTRI0xRAso2r21hA0j811aeaOTwRxfdj0j6HveZnd4cR/RJN0KS5VcacC65rDnjnia4GpJcJHSgqJSIS70YUmlSUcWReDQ1YlVLn8E0Nu0NN4eZ/8yrowmnahWbIu2m5996cL+fxZydPOIC9AIfcoIjuA/CMGoYmczBz5XPRT7nvsYc8e/V+01mJB6mFN6KiVvddde89r6k8XgpyuU4FnFd/VZszb972NfOoNMTyRi7s0/U1++A3W4dXjJx/HvVd93ebq7GiuucaObK7Kge/KcvlUzPUa8+0OuQ3xQtKUkpALcI5T5md+ApEV8yLmJB3CSyRLHwn77mfzTjo+V0ynWvu5wrMmlrkv01baInGbaTdfXunurTn5ADdeJPOTMqPNznAKWCc8vqK3DqV/nJcWncGu0jf2Jn35LaEN4v01MlGriX9D1oB5yERUk4/VkujJgvGk4qJ451mRa3fwfcUfO5xR5Pm47BPCawC+GN4qaQ8l88KScq1+RBMcjEFLWZl9NCGMgd+U1V4vcRboqFpfS3Uzj+X8kSiJM0an/orvIwMMppdLnOEmxcgcxXxCuJrxl9H76CEx+oskfVYXUFHUOlRsXYaK7chQyWndQ0WXPeXNGAhfPt/+cXkrwSYXUEyJi5Pkaac4Qe73WepGP2eI2xCqDlICM/WGqdqG0zd3dJcdVBsfran3oDtXxvfw/P56sRrbg5szWQOHu+9qucjKSguB3CrLCo36Q6O14UajuPQCl5MswjRJOslyoXdHuhtrFZV0mbNNjukoSYp3DLGT07xkw/OMk1NIOJI+78jyD0wpqfHdzRXUvjhKs6fTsgjFWTj9kOVQJ/ll4BvkS2Cxpqw3svVbcaaoA62nX4l/oBjwogHEPH7USEiwxMkG7jb4eJrGy6ozl+8U27x/B45aLVzwtsuK4l3tmp/uIq9tddS/6wUbRdPymlfruM6Ley9SaEuZPf3jhZccArsAv9UEv9F2plNawN9FsWyb4iqi+tHnIY3nEnMEZ4jgFYit+S7TPi84oVn+0WVWVir/fFW//A8=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index bb0e8c50..a0aed50b 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -11,9 +11,16 @@ type Consumer interface {
Push(*pipeline.Pipeline, *structs.Job) (string, error)
Stat()
Consume(*pipeline.Pipeline)
- Register(*pipeline.Pipeline)
+ Register(*pipeline.Pipeline) error
}
type Broker interface {
InitJobBroker(queue priorityqueue.Queue) (Consumer, error)
}
+
+type Item interface {
+ ID() string
+ Ask()
+ Nack()
+ Payload() []byte
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8c5a665e..ab7222ae 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -41,7 +41,7 @@ func testListener(data interface{}) {
fmt.Println(data)
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -64,7 +64,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]Consumer)
// initialize priority queue
- p.queue = priorityqueue.NewPriorityQueue()
+ p.queue = pq
p.log = log
return nil
@@ -157,5 +157,8 @@ func (p *Plugin) Push(j *structs.Job) (string, error) {
}
func (p *Plugin) RPC() interface{} {
- return &rpc{log: p.log}
+ return &rpc{
+ log: p.log,
+ p: p,
+ }
}
diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go
new file mode 100644
index 00000000..7df846ac
--- /dev/null
+++ b/plugins/jobs/pq_plugin/plugin.go
@@ -0,0 +1,34 @@
+package pq_plugin //nolint:stylecheck
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "internal_pq"
+)
+
+type Plugin struct {
+ log logger.Logger
+ pq priorityqueue.Queue
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pq = priorityqueue.NewPriorityQueue()
+ return nil
+}
+
+func (p *Plugin) Push(item interface{}) {
+ p.pq.Push(item)
+ // no-op
+}
+
+func (p *Plugin) Pop() interface{} {
+ return p.pq.Pop()
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index e8b4e83d..2c58c344 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pq_plugin"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -34,6 +35,7 @@ func TestJobsInit(t *testing.T) {
&logger.ZapLogger{},
&jobs.Plugin{},
&ephemeral.Plugin{},
+ &pq_plugin.Plugin{},
)
assert.NoError(t, err)