summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go82
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go6
-rw-r--r--plugins/jobs/config.go13
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go24
-rw-r--r--plugins/jobs/plugin.go68
-rw-r--r--plugins/jobs/rpc.go95
-rw-r--r--plugins/jobs/structs/job.go10
-rw-r--r--plugins/jobs/structs/job_options.go12
-rw-r--r--plugins/jobs/structs/job_options_test.go16
-rw-r--r--plugins/jobs/structs/job_test.go3
11 files changed, 224 insertions, 107 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 3eb20c27..4bbb4095 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,70 +1,104 @@
package ephemeral
import (
+ "sync"
+
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobBroker struct {
- queues map[string]bool
+ queues sync.Map
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- queues: make(map[string]bool),
+ queues: sync.Map{},
pq: q,
}
return jb, nil
}
-func (j *JobBroker) Push(job *structs.Job) (string, error) {
+func (j *JobBroker) Push(job *structs.Job) (*string, error) {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.queues[job.Options.Pipeline]; ok {
- if !b {
- return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ if b, ok := j.queues.Load(job.Options.Pipeline); ok {
+ if !b.(bool) {
+ return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
}
if job.Options.Priority == nil {
- job.Options.Priority = intPtr(10)
+ job.Options.Priority = utils.AsUint64Ptr(10)
}
- job.Options.ID = uuid.NewString()
+ job.Options.ID = utils.AsStringPtr(uuid.NewString())
j.pq.Insert(job)
return job.Options.ID, nil
}
- return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
-}
-
-func (j *JobBroker) Stat() {
- panic("implement me")
-}
-
-func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
- panic("implement me")
+ return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; ok {
+ if _, ok := j.queues.Load(pipeline); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = true
+ j.queues.Store(pipeline, true)
return nil
}
-func intPtr(val uint64) *uint64 {
- if val == 0 {
- val = 10
+func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) {
+ // Use a batch response
+ // Add JobID to the payload to match responses
+ panic("todo")
+}
+
+func (j *JobBroker) Stop(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, false)
+ }
+ }
+}
+
+func (j *JobBroker) StopAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, false)
+ return true
+ })
+}
+
+func (j *JobBroker) Resume(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, true)
+ }
}
- return &val
+}
+
+func (j *JobBroker) ResumeAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, true)
+ return true
+ })
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
+ panic("implement me")
}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 146d1fdc..3d6a95b7 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,8 +1,8 @@
package ephemeral
import (
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -23,6 +23,6 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) {
+func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) {
return NewJobBroker(q)
}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 1cb2c2a2..07e2ef38 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -1,14 +1,19 @@
package jobs
import (
+ "runtime"
+
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
)
// Config defines settings for job broker, workers and job-pipeline mapping.
type Config struct {
- // Workers configures roadrunner server and worker busy.
- // Workers *roadrunner.ServerConfig
+ // NumPollers configures number of priority queue pollers
+ // Should be no more than 255
+ // Default - num logical cores
+ NumPollers uint8 `mapstructure:"num_pollers"`
+ // Pool configures roadrunner workers pool.
Pool *poolImpl.Config `mapstructure:"Pool"`
// Pipelines defines mapping between PHP job pipeline and associated job broker.
@@ -23,5 +28,9 @@ func (c *Config) InitDefaults() {
c.Pool = &poolImpl.Config{}
}
+ if c.NumPollers == 0 {
+ c.NumPollers = uint8(runtime.NumCPU())
+ }
+
c.Pool.InitDefaults()
}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index 4161d7ca..56a8839d 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
deleted file mode 100644
index a2cf6ed9..00000000
--- a/plugins/jobs/interface.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package jobs
-
-import (
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
-)
-
-// Consumer todo naming
-type Consumer interface {
- Push(*structs.Job) (string, error)
- Stat()
- Consume(*pipeline.Pipeline)
- Register(pipe string) error
-}
-
-type Broker interface {
- InitJobBroker(queue priorityqueue.Queue) (Consumer, error)
-}
-
-type Item interface {
- ID() string
- Payload() []byte
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 67077920..8a80479b 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,14 +2,15 @@ package jobs
import (
"context"
+ "sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -27,11 +28,13 @@ type Plugin struct {
cfg *Config `mapstructure:"jobs"`
log logger.Logger
+ sync.RWMutex
+
workersPool pool.Pool
server server.Server
- brokers map[string]Broker
- consumers map[string]Consumer
+ jobConstructors map[string]jobs.Constructor
+ consumers map[string]jobs.Consumer
events events.Handler
@@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.brokers = make(map[string]Broker)
- p.consumers = make(map[string]Consumer)
+ p.jobConstructors = make(map[string]jobs.Constructor)
+ p.consumers = make(map[string]jobs.Consumer)
// initial set of pipelines
p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
@@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue2.NewPriorityQueue()
+ p.queue = priorityqueue.NewBinHeap()
p.log = log
return nil
@@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- for name := range p.brokers {
- jb, err := p.brokers[name].InitJobBroker(p.queue)
+ for name := range p.jobConstructors {
+ jb, err := p.jobConstructors[name].JobsConstruct("", p.queue)
if err != nil {
errCh <- err
return errCh
@@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error {
// start listening
go func() {
- for {
- // get data JOB from the queue
- job := p.queue.GetMax()
-
- if job == nil {
- continue
- }
-
- exec := payload.Payload{
- Context: job.Context(),
- Body: job.Body(),
- }
-
- _, err = p.workersPool.Exec(exec)
- if err != nil {
- panic(err)
- }
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
+
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
+
+ _, err := p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
+ }
+ }
+ }()
}
}()
@@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
- p.brokers[name.Name()] = c
+func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
+ p.jobConstructors[name.Name()] = c
}
func (p *Plugin) Available() {}
@@ -152,12 +159,13 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Push(j *structs.Job) (string, error) {
+func (p *Plugin) Push(j *structs.Job) (*string, error) {
+ const op = errors.Op("jobs_plugin_push")
pipe := p.pipelines.Get(j.Options.Pipeline)
broker, ok := p.consumers[pipe.Driver()]
if !ok {
- panic("broker not found")
+ return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
}
id, err := broker.Push(j)
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index e77cda59..c6bd1645 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,8 +1,12 @@
package jobs
import (
+ "sync"
+
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
)
type rpc struct {
@@ -10,11 +14,96 @@ type rpc struct {
p *Plugin
}
-func (r *rpc) Push(j *structs.Job, idRet *string) error {
+var jobsPool = &sync.Pool{
+ New: func() interface{} {
+ return &structs.Job{
+ Options: &structs.Options{},
+ }
+ },
+}
+
+func pubJob(j *structs.Job) {
+ // clear
+ j.Job = ""
+ j.Payload = ""
+ j.Options = &structs.Options{}
+ jobsPool.Put(j)
+}
+
+func getJob() *structs.Job {
+ return jobsPool.Get().(*structs.Job)
+}
+
+/*
+List of the RPC methods:
+1. Push - single job push
+2. PushBatch - push job batch
+
+3. Reset - managed by the Resetter plugin
+
+4. Stop - stop pipeline processing
+5. StopAll - stop all pipelines processing
+6. Resume - resume pipeline processing
+7. ResumeAll - resume stopped pipelines
+
+8. Workers - managed by the Informer plugin.
+9. Stat - jobs statistic
+*/
+
+func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ // convert transport entity into domain
+ // how we can do this quickly
+ jb := getJob()
+ defer pubJob(jb)
+
+ jb = &structs.Job{
+ Job: j.GetJob().Job,
+ Payload: j.GetJob().Payload,
+ Options: &structs.Options{
+ Priority: &j.GetJob().Options.Priority,
+ ID: &j.GetJob().Options.Id,
+ Pipeline: j.GetJob().Options.Pipeline,
+ Delay: j.GetJob().Options.Delay,
+ Attempts: j.GetJob().Options.Attempts,
+ RetryDelay: j.GetJob().Options.RetryDelay,
+ Timeout: j.GetJob().Options.Timeout,
+ },
+ }
+ id, err := r.p.Push(jb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ resp.Id = *id
+
+ return nil
+}
+
+func (r *rpc) PushBatch(j *structs.Job, idRet *string) error {
+ const op = errors.Op("jobs_rpc_push")
id, err := r.p.Push(j)
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
- *idRet = id
+
+ *idRet = *id
+ return nil
+}
+
+func (r *rpc) Stop(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) StopAll(_ bool, w *string) error {
+ return nil
+}
+
+func (r *rpc) Resume(pipeline string, w *string) error {
+ return nil
+}
+
+func (r *rpc) ResumeAll(_ bool, w *string) error {
return nil
}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
index 268444db..1ef4d2ca 100644
--- a/plugins/jobs/structs/job.go
+++ b/plugins/jobs/structs/job.go
@@ -17,12 +17,12 @@ type Job struct {
Options *Options `json:"options,omitempty"`
}
-func (j *Job) ID() string {
+func (j *Job) ID() *string {
return j.Options.ID
}
-func (j *Job) Priority() uint64 {
- return *j.Options.Priority
+func (j *Job) Priority() *uint64 {
+ return j.Options.Priority
}
// Body packs job payload into binary payload.
@@ -34,8 +34,8 @@ func (j *Job) Body() []byte {
func (j *Job) Context() []byte {
ctx, _ := json.Marshal(
struct {
- ID string `json:"id"`
- Job string `json:"job"`
+ ID *string `json:"id"`
+ Job string `json:"job"`
}{ID: j.Options.ID, Job: j.Job},
)
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
index 029a797d..3e1ada85 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/structs/job_options.go
@@ -9,23 +9,23 @@ type Options struct {
Priority *uint64 `json:"priority"`
// ID - generated ID for the job
- ID string `json:"id"`
+ ID *string `json:"id"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int `json:"delay,omitempty"`
+ Delay uint64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts int `json:"maxAttempts,omitempty"`
+ Attempts uint64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int `json:"retryDelay,omitempty"`
+ RetryDelay uint64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int `json:"timeout,omitempty"`
+ Timeout uint64 `json:"timeout,omitempty"`
}
// Merge merges job options.
@@ -52,7 +52,7 @@ func (o *Options) Merge(from *Options) {
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int) bool {
+func (o *Options) CanRetry(attempt uint64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go
index 18702394..a16f7dd0 100644
--- a/plugins/jobs/structs/job_options_test.go
+++ b/plugins/jobs/structs/job_options_test.go
@@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) {
})
assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, 1, opts.Attempts)
- assert.Equal(t, 2, opts.Delay)
- assert.Equal(t, 1, opts.Timeout)
- assert.Equal(t, 1, opts.RetryDelay)
+ assert.Equal(t, uint64(1), opts.Attempts)
+ assert.Equal(t, uint64(2), opts.Delay)
+ assert.Equal(t, uint64(1), opts.Timeout)
+ assert.Equal(t, uint64(1), opts.RetryDelay)
}
func TestOptions_MergeKeepOriginal(t *testing.T) {
@@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) {
})
assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, 10, opts.Attempts)
- assert.Equal(t, 10, opts.Delay)
- assert.Equal(t, 10, opts.Timeout)
- assert.Equal(t, 10, opts.RetryDelay)
+ assert.Equal(t, uint64(10), opts.Attempts)
+ assert.Equal(t, uint64(10), opts.Delay)
+ assert.Equal(t, uint64(10), opts.Timeout)
+ assert.Equal(t, uint64(10), opts.RetryDelay)
}
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
index 92f78081..0aa5b177 100644
--- a/plugins/jobs/structs/job_test.go
+++ b/plugins/jobs/structs/job_test.go
@@ -3,6 +3,7 @@ package structs
import (
"testing"
+ "github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/assert"
)
@@ -13,7 +14,7 @@ func TestJob_Body(t *testing.T) {
}
func TestJob_Context(t *testing.T) {
- j := &Job{Job: "job", Options: &Options{ID: "id"}}
+ j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}}
assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context())
}