diff options
author | Valery Piashchynski <[email protected]> | 2021-06-30 11:08:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-30 11:08:40 +0300 |
commit | 2ac3b240b118961c1a30cc18dd22d08b7fac6516 (patch) | |
tree | 25f48908286a05ea78e4049d89f88450d0541f99 | |
parent | c0f808bb8c7077e18aa197f024628b9912def58b (diff) |
- Update arch diagrams
- Update ephemeral plugin
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | pkg/priority_queue/interface.go | 3 | ||||
-rw-r--r-- | pkg/priority_queue/queue.go | 8 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 14 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 42 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/entry.go | 21 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/queue.go | 7 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 9 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 9 | ||||
-rw-r--r-- | plugins/jobs/pq_plugin/plugin.go | 34 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 2 |
11 files changed, 136 insertions, 15 deletions
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 5945a013..00998d78 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,7 +1,6 @@ package priorityqueue type Queue interface { - Push() + Push(item interface{}) Pop() interface{} - BLPop() } diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go index 79afab18..c12acbf6 100644 --- a/pkg/priority_queue/queue.go +++ b/pkg/priority_queue/queue.go @@ -1,15 +1,17 @@ package priorityqueue +import "fmt" + type QueueImpl struct { } func NewPriorityQueue() *QueueImpl { - return nil + return &QueueImpl{} } // Push the task -func (q *QueueImpl) Push() { - +func (q *QueueImpl) Push(item interface{}) { + fmt.Println(item) } func (q *QueueImpl) Pop() interface{} { diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 3925ef64..f096a0af 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -38,3 +38,17 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error { return nil } + +// sort.Sort + +func (w *WorkerList) Len() int { + return len(w.Workers) +} + +func (w *WorkerList) Less(i, j int) bool { + return w.Workers[i].Pid < w.Workers[j].Pid +} + +func (w *WorkerList) Swap(i, j int) { + w.Workers[i], w.Workers[j] = w.Workers[j], w.Workers[i] +} diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 905f5409..95f476a6 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,20 +1,38 @@ package ephemeral import ( + "github.com/google/uuid" + "github.com/spiral/errors" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { + jobs chan *entry + queues map[*pipeline.Pipeline]*queue + pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { - return &JobBroker{}, nil + jb := &JobBroker{ + jobs: make(chan *entry, 10), + pq: q, + } + + go jb.serve() + + return jb, nil } -func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) { - panic("implement me") +func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { + id := uuid.NewString() + + j.jobs <- &entry{ + id: id, + } + + return id, nil } func (j *JobBroker) Stat() { @@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) { - panic("implement me") +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { + const op = errors.Op("ephemeral_register") + if _, ok := j.queues[pipeline]; !ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + } + + j.queues[pipeline] = newQueue() + + return nil +} + +func (j *JobBroker) serve() { + for item := range j.jobs { + // item should satisfy + j.pq.Push(item) + } } diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go new file mode 100644 index 00000000..bf8796d5 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/entry.go @@ -0,0 +1,21 @@ +package ephemeral + +type entry struct { + id string +} + +func (e *entry) ID() string { + return e.id +} + +func (e *entry) Ask() { + // no-op +} + +func (e *entry) Nack() { + // no-op +} + +func (e *entry) Payload() []byte { + panic("implement me") +} diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go new file mode 100644 index 00000000..1c6d865b --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/queue.go @@ -0,0 +1,7 @@ +package ephemeral + +type queue struct{} + +func newQueue() *queue { + return &queue{} +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index d452d345..be941255 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-23T11:05:51.495Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="_SQi43bjgSX-nT5V1Ksg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7VvrU9s4EP9rMkOZIeO3nY+E8Givvebgblrum2IriVvbcmUlJPz1t7Llp5yQgpNAr1BKtHpYu/vbh1amp1+Eq2uK4vkn4uGgpyneqqePepqmDiwFfnHKWlBUS88oM+p7glYS7vxHLIhi4mzhezipDWSEBMyP60SXRBF2WY2GKCUP9WFTEtSfGqMZlgh3Lgpk6hffY/N8d/BV9txgfzZnUleI8vGCkMyRRx4qJP2yp19QQlj2KVxd4IALMBdNNu9qQ2+xN4ojtsuEr1G0ulFJiJePLLKv/8L/Ku6ZZmTLLFGwEEyL3bJ1LgVKFpGH+SpKTx8+zH2G72Lk8t4HUDzQ5iwMoKXCx6kfBBckIDSdq3sIO1MX6Amj5Duu9FiugydT6JH5EKwtMWV4VSEJvq4xMMHoGoaIXt0RMhZAG9g5gh5KtWn5oHlVYbl2kIDKrFi8FCV8ENL8GclKcsQegEs0CWVzMiMRCi5L6rAu6XLMR0JiId9vmLG1sBS0YKQufRAhXX/l8/tm3ryv9o1WYvGstRatjTpIyIK6eBubgk+G6AyzLQPFglwIW1VKcYCYv6xbYJt2xNQx8WHPBRQMw6xDwbQbGs52KqZV7UVaqQEqc9BYKROOtNI5pWhdGRbzAYkEqILn52NMPwrGVj6rQAxa9zmm4HMJMN5YV9F2UFyqO8LSOBQsG2DSrd1g2RVUcp9a+vnbMayknMHP+J+7G1g0+wUbWkdut0HAxI5ntAUBR5voltVRELAbIjY0OQioRksQ0PcVAwZHtc+qdRa2+nP2qezNPo1dw4ZyGAPVmugpcopDGehvZ/5ysFgH8ub6oO8MLNNUbc1QdXtQh47WTCr3DB05h/+EmDvnnv39+PLj+z8vO3XnWAWHbre584Fl66grd97M6fU2d661uPO9pfT2MS1Urblz+7X5c3NHE+3cQl+kUefN+Vy1ps5Su0dTaOfnuhcp1JRcYU+zAtj/MK7p2fqx4HWXYQjs+VFPP4deJV7B/6nQlIx+xrhKeZ9R6QMHxs5Q4M/EPBdki2m928MuocAvEWM4ZGjgR7h8NHyaid/pBic54QOZgFjO7xhduGxBcT4ABDJpTgJa3KTNaZPyLN4DPGVZp8M7m9uGXaYzRuna1I9mBSPF88doHRDkPTnuc8wllRTjinYbkw2D5RHFd1FwnilklKpsKNQzypgYEhg1DdLqGw9mPKaRiAkLVTXRvkKhH3DLucHBEvNVt8bBqcm/W2tb6ZdYtULPvrqJj5baLE8oLfHRaomPzTJGZ8Zn/QLGl0Pv1dvf2I9xxtMTxjUCx7t+ctQ5YziMWWmDaa66mcl9sHTLUVjfbmUXE0msh9rX336IyYI9Z1O/nVXa60i11F2dlaruy1upqqSeo6XzO2fz2qHKp8qO6V/n9dOX6VSRQtAtWC7upwfik29k8q7To/DUcbHber01cUzD3KqD3a1HNZtH4cGOR+G9VTZVTRI0xRAso2r21hA0j811aeaOTwRxfdj0j6HveZnd4cR/RJN0KS5VcacC65rDnjnia4GpJcJHSgqJSIS70YUmlSUcWReDQ1YlVLn8E0Nu0NN4eZ/8yrowmnahWbIu2m5996cL+fxZydPOIC9AIfcoIjuA/CMGoYmczBz5XPRT7nvsYc8e/V+01mJB6mFN6KiVvddde89r6k8XgpyuU4FnFd/VZszb972NfOoNMTyRi7s0/U1++A3W4dXjJx/HvVd93ebq7GiuucaObK7Kge/KcvlUzPUa8+0OuQ3xQtKUkpALcI5T5md+ApEV8yLmJB3CSyRLHwn77mfzTjo+V0ynWvu5wrMmlrkv01baInGbaTdfXunurTn5ADdeJPOTMqPNznAKWCc8vqK3DqV/nJcWncGu0jf2Jn35LaEN4v01MlGriX9D1oB5yERUk4/VkujJgvGk4qJ451mRa3fwfcUfO5xR5Pm47BPCawC+GN4qaQ8l88KScq1+RBMcjEFLWZl9NCGMgd+U1V4vcRboqFpfS3Uzj+X8kSiJM0an/orvIwMMppdLnOEmxcgcxXxCuJrxl9H76CEx+oskfVYXUFHUOlRsXYaK7chQyWndQ0WXPeXNGAhfPt/+cXkrwSYXUEyJi5Pkaac4Qe73WepGP2eI2xCqDlICM/WGqdqG0zd3dJcdVBsfran3oDtXxvfw/P56sRrbg5szWQOHu+9qucjKSguB3CrLCo36Q6O14UajuPQCl5MswjRJOslyoXdHuhtrFZV0mbNNjukoSYp3DLGT07xkw/OMk1NIOJI+78jyD0wpqfHdzRXUvjhKs6fTsgjFWTj9kOVQJ/ll4BvkS2Cxpqw3svVbcaaoA62nX4l/oBjwogHEPH7USEiwxMkG7jb4eJrGy6ozl+8U27x/B45aLVzwtsuK4l3tmp/uIq9tddS/6wUbRdPymlfruM6Ley9SaEuZPf3jhZccArsAv9UEv9F2plNawN9FsWyb4iqi+tHnIY3nEnMEZ4jgFYit+S7TPi84oVn+0WVWVir/fFW//A8=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index bb0e8c50..a0aed50b 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -11,9 +11,16 @@ type Consumer interface { Push(*pipeline.Pipeline, *structs.Job) (string, error) Stat() Consume(*pipeline.Pipeline) - Register(*pipeline.Pipeline) + Register(*pipeline.Pipeline) error } type Broker interface { InitJobBroker(queue priorityqueue.Queue) (Consumer, error) } + +type Item interface { + ID() string + Ask() + Nack() + Payload() []byte +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8c5a665e..ab7222ae 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -41,7 +41,7 @@ func testListener(data interface{}) { fmt.Println(data) } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -64,7 +64,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]Consumer) // initialize priority queue - p.queue = priorityqueue.NewPriorityQueue() + p.queue = pq p.log = log return nil @@ -157,5 +157,8 @@ func (p *Plugin) Push(j *structs.Job) (string, error) { } func (p *Plugin) RPC() interface{} { - return &rpc{log: p.log} + return &rpc{ + log: p.log, + p: p, + } } diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go new file mode 100644 index 00000000..7df846ac --- /dev/null +++ b/plugins/jobs/pq_plugin/plugin.go @@ -0,0 +1,34 @@ +package pq_plugin //nolint:stylecheck + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "internal_pq" +) + +type Plugin struct { + log logger.Logger + pq priorityqueue.Queue +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pq = priorityqueue.NewPriorityQueue() + return nil +} + +func (p *Plugin) Push(item interface{}) { + p.pq.Push(item) + // no-op +} + +func (p *Plugin) Pop() interface{} { + return p.pq.Pop() +} + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index e8b4e83d..2c58c344 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/jobs/pq_plugin" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -34,6 +35,7 @@ func TestJobsInit(t *testing.T) { &logger.ZapLogger{}, &jobs.Plugin{}, &ephemeral.Plugin{}, + &pq_plugin.Plugin{}, ) assert.NoError(t, err) |