diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 /plugins/jobs | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 82 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 6 | ||||
-rw-r--r-- | plugins/jobs/config.go | 13 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 24 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 68 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 95 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go | 10 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go | 12 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options_test.go | 16 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go | 3 |
11 files changed, 224 insertions, 107 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 3eb20c27..4bbb4095 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,70 +1,104 @@ package ephemeral import ( + "sync" + "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { - queues map[string]bool + queues sync.Map pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - queues: make(map[string]bool), + queues: sync.Map{}, pq: q, } return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (string, error) { +func (j *JobBroker) Push(job *structs.Job) (*string, error) { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues[job.Options.Pipeline]; ok { - if !b { - return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } if job.Options.Priority == nil { - job.Options.Priority = intPtr(10) + job.Options.Priority = utils.AsUint64Ptr(10) } - job.Options.ID = uuid.NewString() + job.Options.ID = utils.AsStringPtr(uuid.NewString()) j.pq.Insert(job) return job.Options.ID, nil } - return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} - -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; ok { + if _, ok := j.queues.Load(pipeline); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = true + j.queues.Store(pipeline, true) return nil } -func intPtr(val uint64) *uint64 { - if val == 0 { - val = 10 +func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { + // Use a batch response + // Add JobID to the payload to match responses + panic("todo") +} + +func (j *JobBroker) Stop(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.queues.Store(pipeline, false) + } + } +} + +func (j *JobBroker) StopAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, false) + return true + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.queues.Store(pipeline, true) + } } - return &val +} + +func (j *JobBroker) ResumeAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, true) + return true + }) +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { + panic("implement me") } diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 146d1fdc..3d6a95b7 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,8 +1,8 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -23,6 +23,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { +func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(q) } diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 1cb2c2a2..07e2ef38 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -1,14 +1,19 @@ package jobs import ( + "runtime" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" ) // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { - // Workers configures roadrunner server and worker busy. - // Workers *roadrunner.ServerConfig + // NumPollers configures number of priority queue pollers + // Should be no more than 255 + // Default - num logical cores + NumPollers uint8 `mapstructure:"num_pollers"` + // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"Pool"` // Pipelines defines mapping between PHP job pipeline and associated job broker. @@ -23,5 +28,9 @@ func (c *Config) InitDefaults() { c.Pool = &poolImpl.Config{} } + if c.NumPollers == 0 { + c.NumPollers = uint8(runtime.NumCPU()) + } + c.Pool.InitDefaults() } diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index 4161d7ca..56a8839d 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-07-06T10:33:15.713Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.5 Safari/537.36" etag="zwPDkYwOeR-nPCysXFRw" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc9o6E/41zKSdCWNbvn7MrW36Jg0NSdOcb8YW4MYgapsk9Ne/kiwb2xJggm0ghHOmAfkm69mbdlerFjgbvX4N7MnwGrnQbymS+9oC5y1FkS1dwn9Iy4y1yCprGQSey9rmDV3vH2SNyWlTz4Vh7sQIIT/yJvlGB43H0IlybXYQoJf8aX3k5586sQeQa+g6ts+3PnhuNEx6hz/zI9+gNxhG3KGRnZzPGsKh7aKXTBO4aIGzAKEo/jZ6PYM+GcBkaOLrviw4mvYtgOOozAXdTqg/nl3P7O9IHcqDHz8unoLjBJBn25+yl769ubnDLd9vTrv4T+fq/uvlD/YG0SwZmQBNxy4kd5Za4PRl6EWwO7EdcvQFEwNuG0YjH/+S8dc+Gkdf7JHnEzrofsF3ukZjhA/YvjcY47YgHsFT3+5Bv4NCL/IQafdhnzQ/wyDyMCpXhcMRmmSOnrCb9VAUoRF5rOf7Z8hHAe0y6Peh7ji4PYwC9AQzR1zD6knkPfghZaNMngFfM01siL9CNIJRMMOnsKOqZChtLb4qIXtTZw0vcyJSdDVuG2bJR7UY8TLCHaT3nwOLvzBsxTj/Ho9fv8n4uud/0dj4+hP+JznHisrhvCGm+cF1bWj2hYOrOybs9RkVZNol+llGHVWgkUgSBoWp6TwSpiRAIuHa6pHgxh26WOiwnyiIhmiAxrZ/MW89zSMzP+cKEQ6gePyBUTRjEtSeRiiPFh7BYPabXI9pk/18zB47f2U3j3/N2K/1sQnRNHDgsvdnAxDZwQBGS05kNySjsxTqAPp25D3nRbYINnZpB3m4z3MSMfLcash6/hZxR9lVWfnK3ShPawYo3CgeGu5GJ0FgzzKnTcgJIUdn6Ru/nfTAVkjv1YsylId/PSakhr/P6Y78mGWJcDfIVS5JrWpT1JonMk0qR61VkVAigTPmQgffSTompsJ99xu+afwHd2g2dqrVMRo0XVWkY0ylB3R9WzpGKkCi8zomNXyzOkY1a1Ix1lb5PMvlKc+vx+dS83yultVKUjOMDopUpTTM6B/KokYi0hvSFrrVNi1d02QDm8LAsHIkBfRmSYqfglzbkTMkmuOyc3F1+eOiUnUBZawwDJG6sHQD2FtTF4UpiSpSF4pAXQBQk7owtsnpcpbP28be6AutJKvXw+mL5wxiQWCC4iRHzZLNyvN1WVp6vqVJy87HX+IeVypPzL3TUHKOaOc0vHtkW/nkeyMJpXGao6XoPnENTnIEoP+dEm/q6Qi/njdugRN8VJq84n/poElx+zH1G5JjauYYFt/RMXNIkmMOHlsY5A+70EGBHXsfyTmElgLfG8P5o/G3AftLO9hLGr6jHh6Wk24UTJ1oGsDkBDwgveJFuG1SbBsGxZY3vXvsUiUHTXKw2G3cS3rFOb134I0H6Yukz+/YMx/Z7srzbiZkpML0vPS36CULnFx06sau3sRfzPzCCJ/V96lPneh+xjGMdWVlGQctNhv6GvlPZDbo9MObDfGnGutAVwuCVBVYB7rAOrDq8lfq74D3EsrbefbreBMYv9MK3jrHcne28qyTKIKjSTRnQaq0F79kHa90S6gw391ML3rcsDbVrztvBNE0ekunPmRVbCQab5RVslyXsJJlDp2tzWVKT2WUbbstEjiad3JvBrYgdo05GrapW+HoD+p9qtSh0DcdKA4g90xN1SSe2xpxKMjFkAAQsCEQORTUuoBROGACiJXuOGsEFoAhOj4/+okAZcYAH+gfea4bMzAMvX92j96KjDabDeP7aqct7ZzcC/NsyGQtB+AYjWHNGAHJyGOkGDxGsigjwKgLI94bN8HGR0sh0Rx0kCCphpIDSZVMHqQEyGZA4ie+GQvxGFsk9ojILGaXYMtngkeTWYPauUcw6RPpZpy2jPNDhzNxoOaycJQm4dyqp3VPYypJrGS1y8qs2jp5S1BFMRqO0/Hz8xHETySjPRcViwT6AcRXgMgcEsZXajOHthuP31O2N0uyvdJQQH4528sNx1KT4cmw/VdIentKeJG4zvoBGpHxG0L67gMvxBodEq9tj55CnELPns3kRDu+7qjiGVO/ryxIudV7urYrIkJRSooIvS6/hcJPZTvTcHg0N8nj2ayEuRw/PoNzhWjtSQ6vXBYttTa0+Cy4BXC8cwvbNPPImPyESdMaNLAV3vHAQYKmETGKztIlIwK/Df7vC3ns6SCwXQ/OjyWDmmec9HQhAq4dDlOOXLSUIV20sNyZnFJNYXVF0Y+c2BbkkXY4iV+0772SfsSEBIOLZxjTE6WdoT0hF4xeB2QtT9t+CdX2NKTPqpWEgJwjIdPgmdsweRIy6sqdVAAvib91cMPDze3/Lm45ckoGbhIgB4bhaqHbs52nARXTNzElLlCdu+ls1PS8MDYttc0vbhGL4wr8wP/0vvsCzC/q0+jk8ev0tWNY3455xJqLUApCj7FLxud/zd0xBb9N4deCGFQapsSiK5yOqJF3FNtyn7YUzRQOFRd+WzaO9CxuFLuRHR19TlxdxO45+owNoLBNDsT2EAwClHvvaoKGdb0RteY+z5135BU+f49tuqMkfLuH78VoMQfWnnT9ls2J8oTWAl/Y/xgYLF19rDvJVClE/jMMF7zdAp0QUL2bFf58FFikLeoU4LpZsKZViQ9FAEMkv6uwp4UC/MM/stQ/IhwzQSKf8LzKnaIbIS2IZ9DVQhvNYuuMrepKnl0sNeGfXFBI5E6si1t4z8/fNlGOxCoZ2nhW4+/yeBYm8xawBOMpdM9WMJ7h7XlP+vP058F4+tm1f5896Hf3SYLbHkmfRtOIb637q1B77nZN/7/H30poP3hXolWRwrGtZ1XkutnvsmUUHBWaaWTphr9Cl8CmV+iKUaDNzXLglwGREQY/Tu66GwqAN8V5hBS2aIpahSixlOKACyyZNNWsasegkNy3uiLhbdlnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTSUonwBprZUBWAJtPEVmlyt0hADIe0O9cp7TL3LNPLOUa9pLTdgeFpc/4qqTR4xEFvN8c6sUd138hWep+wI+WpFYlRXEKO0+RUGaIJ8OYu9E3go8CIyPj/vL+6rXQWv25LU74usd2BI0tkZb6T36aduRwAvO0TWe11hfSEyO5SdWVKsNOqGXCYu9sx6VzgevL64vrl9rJTz9mbebMjbnjdru8N5y2dTNXAQ2BYHVeRKU+UVjjG1KOnVJuY4fFLAyfXPzmEyuGptm8H5xUY1VJktZDOakmQAETonmiSpggSaZgwfYJYxfETLZmtDR7Be73I0wSKVoOEM4QhyyLzvxEY5XS+ZenKSemhNrAUTg8RLNJaeQN42QM+eS5O6DwsoXSrUWk4T1HKrX/V6gBKW1OYneHIb/56DZdN7MMmXQW5R4pfrPRebwok9TtoUevf4ppdjL/IwwP/ooh+y3hbiwSQD5lGGpvn/ySNJV8akIYT0D8sLbbEs83jlQOdnJuUl+9hMs6CDK/oMSJ87tPqCl3bZzQ9HthdkYSOBNCLV4p7gLIyBo4c90uOxPYIhvVt6jYPGfW+AvxzZo7+TOLuqB+1xGNn+E0u2ipxP6ShQwMg9jilaaY5dNm1vnYEozYiFKudNs2FBC36hn4rYk/NCWpLFsadlCeSoWsFqLCF78roub4p0L25/XRyq4uPqD8kACOSpAZqUp1v2927ul5l7YurwywgHTTCrFJ7HQGveL7Os1xneZNy40bxg1e4TKeOun4JfBcslYZb5dE2wXYQ4JaoultuhlKi94ThB7d9lNL4jHMdPzHchGlGbeuN4DZhKOV6rYvK9TAJnEbgR+KmGaNSbllhexK/A7TuSaMD7/VXLhTj5WAUCllxAQNN4aSfakaU2APh8Wvz268u/8oZ9utYPT7tOyC5V5J4TOI5bmHw034bZWyPOb5d7RsVyjxFKMo3etIy+LGnF7Zi0kiU61vWNA8mU+GctzwASXaOaueLMG/vHhXBYHNXPIO89qpLs45WsddL8mkZA/TRferOIaolesTS9SFS6UZhQV1egQvzqHzbk+nQlKsQoPlHbKStS22oA941lN/cGbHU3Qr9ANjlVakjL9xEQXaMz79HCa1RFXXFNPSpRUE/018nVxjOg0ub1km2tFmyEVYVBzk2JdLNkIlYVBbPEQPA+2g4iAQBsMbAS4ITYSUVR4q8foTCOgaUT1/jCQ3PdKtzGo5rCO29BXWVGxVDyLr0H24sOFCMgWXIZjMxGIeJ9QL9ibCQvDqm5EFvqLg0SkqgZH6n0+qT/5IoxpGG8k7P/JS0OiZRJtkuaKfArAp2CAgGLK/xn43CZcGHa/3ceuqtCbFic/E/qFWbrpMqCGjy1heg05WPusL45KagIIDYnK9+SajPxw9cEiEst0bywm5urpswvgefUFHokgA4s4FZnfnFGtSWXNMC0ujDhndIXvy/O7u9IYPzob9tFpFBkLtliGgQtmmXi8yUk37dSVyyd87YYKl8Jr7ZSeGIIebf2eYCZmWbWdGHwTFA7yBwHUNy3RAZayoNZltMEOq8+vHZoRfEurj4R5/KUdaGAelwofJFgReJ8F1pxH/iaPa+CWrIx57fS5KbtKNRG0ilUWeUQUEUbEzWaUJFS9N5y9xYsWkUpyd3VlwDfDGyB+4pLqmA1H8e9cCKacu5xkoWIA4EuSJ9vNM1C4QOeB+yIUpVCXTNZk3iEGnVDAd5T2LHDMK5YT+MUrUyK+oGhVVx0Bwxh2V1NbhKwHYrwlqyBs32dBkqnCe5WhBfwTuIrUj2VegCSPQlcamYek3rjXt9zcMvfKRTI1prsTdcjBVfjeuovMIx4zVhih4OaFKCmCtLo69piQuz2O1yn6qLw6eKA6wYMXtYNC6zdYnBBadbby5vby7vHudEq1VitddX0sYRVWxPvGlLJfHxVrwsc3h97Cx2IicJlopcZsfFat/Y5fPbidX/ieB0ivoDvN6RW9tEIjlAw+8Q2fiQH7DEZEAeNybBRw6jmoB3mKXqLdGvu+HU+fUTtFluEcnEVeTLByXqcxWW86yJS3uNMqt63khBBQqJsFWcmXSMTuZ2sWn3w7i39wrzMSG6STeEwG7Xz+ZkzXaGQ32GMrBOuVS8wpi1ohZ7sun3h+gZZMoAF69MKlqyRnbvLKQazXVeoTl28+wkZ0kQ6fo8LhL949I49ithNp3Nx3sqF8dhS2Ik/JTsyzOVs9laHxpJAUwvIA54lFUtpkCXV7QaLshPvsvm2jVZAXJpGu4Xk6rfl2yo6nxqg6VniKXWNybaxrjV3VuX9A+U3fYn3bpHm/25jV6XLCBJtVveOSuK9aeLgl7R0UOKW4tBcntNUjGQLoXX33qm6Oyfh09EbtgCquhs/bGcn+tGxZz6yXQpRrNF6swiK+1UQ6LVtIrRqou04UFsw0a4w8QkADfATbT5rRlZFijXJEKxes/LbadMduXjj9t0uyDUNoWHb6JpclXd4vIdFuWtmzGxg6+glbZ3Sa4uqXparFRPhlTQk2VCejMr7K97DEljjg8RiEpNkjsSAVI7E1jXVFUvjzG7ctNRUF1yjSLK59Bqg6sXV4sVr6jHvNT6nLHYfZ6zvg/IKAMnkJ1qyIM2srhqRYpj42oOJQV5INypOh+JVOtQvNEbHdLFdnwYDQkQShFlo99DKSwJJsziUNUFqtyJI7QZLRNZmKPNz7fx85wPW9WG1TD7/qWFYBX72eTSEZVJItj/gDeHtw1UBKniGUSwNNF9YsapydRUi9TV8uZv+ev4Lrqz7x5uOZHx/GC2q5noSRXA0YdVC854m6dp+TQ8XGZIWa83Aehzr0DUQbSZ4WQGesqTlY1m6IdCPokLXVWz1KQRT4cB8QMETzYx34oKvBNEjVvwVf8WPHSKXeG/YDqu5mJcdhsjxbFZV15vsohFUAZCamo81q4YiYEsgchAUp3SVIbk46rUoH6HcpuCiHAXK8Mlu4VmlSTD36M7hYWu9VIW6uhrXgk7JN6bJnegZrfjcjagJ4qQdPHIRNU3IHMeFPozWTvqoq78qFdR0/DL9nacn7EQntaSThAZz3cMjC/v21CcDS/QlSe6ZhjH5wj4+292Vgdbbi2YnVaf97JHIJXVz8rYQMGWB0FU0gdC11he6+GeACFRzp0FgT4bXCDMlbvw/</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go deleted file mode 100644 index a2cf6ed9..00000000 --- a/plugins/jobs/interface.go +++ /dev/null @@ -1,24 +0,0 @@ -package jobs - -import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" -) - -// Consumer todo naming -type Consumer interface { - Push(*structs.Job) (string, error) - Stat() - Consume(*pipeline.Pipeline) - Register(pipe string) error -} - -type Broker interface { - InitJobBroker(queue priorityqueue.Queue) (Consumer, error) -} - -type Item interface { - ID() string - Payload() []byte -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 67077920..8a80479b 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,14 +2,15 @@ package jobs import ( "context" + "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -27,11 +28,13 @@ type Plugin struct { cfg *Config `mapstructure:"jobs"` log logger.Logger + sync.RWMutex + workersPool pool.Pool server server.Server - brokers map[string]Broker - consumers map[string]Consumer + jobConstructors map[string]jobs.Constructor + consumers map[string]jobs.Consumer events events.Handler @@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.brokers = make(map[string]Broker) - p.consumers = make(map[string]Consumer) + p.jobConstructors = make(map[string]jobs.Constructor) + p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) @@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue2.NewPriorityQueue() + p.queue = priorityqueue.NewBinHeap() p.log = log return nil @@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.brokers { - jb, err := p.brokers[name].InitJobBroker(p.queue) + for name := range p.jobConstructors { + jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) if err != nil { errCh <- err return errCh @@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error { // start listening go func() { - for { - // get data JOB from the queue - job := p.queue.GetMax() - - if job == nil { - continue - } - - exec := payload.Payload{ - Context: job.Context(), - Body: job.Body(), - } - - _, err = p.workersPool.Exec(exec) - if err != nil { - panic(err) - } + for i := uint8(0); i < p.cfg.NumPollers; i++ { + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } + + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } + + _, err := p.workersPool.Exec(exec) + if err != nil { + panic(err) + } + } + }() } }() @@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { - p.brokers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { + p.jobConstructors[name.Name()] = c } func (p *Plugin) Available() {} @@ -152,12 +159,13 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (string, error) { +func (p *Plugin) Push(j *structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Driver()] if !ok { - panic("broker not found") + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) } id, err := broker.Push(j) diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index e77cda59..c6bd1645 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,12 @@ package jobs import ( + "sync" + + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" ) type rpc struct { @@ -10,11 +14,96 @@ type rpc struct { p *Plugin } -func (r *rpc) Push(j *structs.Job, idRet *string) error { +var jobsPool = &sync.Pool{ + New: func() interface{} { + return &structs.Job{ + Options: &structs.Options{}, + } + }, +} + +func pubJob(j *structs.Job) { + // clear + j.Job = "" + j.Payload = "" + j.Options = &structs.Options{} + jobsPool.Put(j) +} + +func getJob() *structs.Job { + return jobsPool.Get().(*structs.Job) +} + +/* +List of the RPC methods: +1. Push - single job push +2. PushBatch - push job batch + +3. Reset - managed by the Resetter plugin + +4. Stop - stop pipeline processing +5. StopAll - stop all pipelines processing +6. Resume - resume pipeline processing +7. ResumeAll - resume stopped pipelines + +8. Workers - managed by the Informer plugin. +9. Stat - jobs statistic +*/ + +func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { + const op = errors.Op("jobs_rpc_push") + + // convert transport entity into domain + // how we can do this quickly + jb := getJob() + defer pubJob(jb) + + jb = &structs.Job{ + Job: j.GetJob().Job, + Payload: j.GetJob().Payload, + Options: &structs.Options{ + Priority: &j.GetJob().Options.Priority, + ID: &j.GetJob().Options.Id, + Pipeline: j.GetJob().Options.Pipeline, + Delay: j.GetJob().Options.Delay, + Attempts: j.GetJob().Options.Attempts, + RetryDelay: j.GetJob().Options.RetryDelay, + Timeout: j.GetJob().Options.Timeout, + }, + } + id, err := r.p.Push(jb) + if err != nil { + return errors.E(op, err) + } + + resp.Id = *id + + return nil +} + +func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { + const op = errors.Op("jobs_rpc_push") id, err := r.p.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - *idRet = id + + *idRet = *id + return nil +} + +func (r *rpc) Stop(pipeline string, w *string) error { + return nil +} + +func (r *rpc) StopAll(_ bool, w *string) error { + return nil +} + +func (r *rpc) Resume(pipeline string, w *string) error { + return nil +} + +func (r *rpc) ResumeAll(_ bool, w *string) error { return nil } diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go index 268444db..1ef4d2ca 100644 --- a/plugins/jobs/structs/job.go +++ b/plugins/jobs/structs/job.go @@ -17,12 +17,12 @@ type Job struct { Options *Options `json:"options,omitempty"` } -func (j *Job) ID() string { +func (j *Job) ID() *string { return j.Options.ID } -func (j *Job) Priority() uint64 { - return *j.Options.Priority +func (j *Job) Priority() *uint64 { + return j.Options.Priority } // Body packs job payload into binary payload. @@ -34,8 +34,8 @@ func (j *Job) Body() []byte { func (j *Job) Context() []byte { ctx, _ := json.Marshal( struct { - ID string `json:"id"` - Job string `json:"job"` + ID *string `json:"id"` + Job string `json:"job"` }{ID: j.Options.ID, Job: j.Job}, ) diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go index 029a797d..3e1ada85 100644 --- a/plugins/jobs/structs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -9,23 +9,23 @@ type Options struct { Priority *uint64 `json:"priority"` // ID - generated ID for the job - ID string `json:"id"` + ID *string `json:"id"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay int `json:"delay,omitempty"` + Delay uint64 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts int `json:"maxAttempts,omitempty"` + Attempts uint64 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int `json:"retryDelay,omitempty"` + RetryDelay uint64 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int `json:"timeout,omitempty"` + Timeout uint64 `json:"timeout,omitempty"` } // Merge merges job options. @@ -52,7 +52,7 @@ func (o *Options) Merge(from *Options) { } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int) bool { +func (o *Options) CanRetry(attempt uint64) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go index 18702394..a16f7dd0 100644 --- a/plugins/jobs/structs/job_options_test.go +++ b/plugins/jobs/structs/job_options_test.go @@ -79,10 +79,10 @@ func TestOptions_Merge(t *testing.T) { }) assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, 1, opts.Attempts) - assert.Equal(t, 2, opts.Delay) - assert.Equal(t, 1, opts.Timeout) - assert.Equal(t, 1, opts.RetryDelay) + assert.Equal(t, uint64(1), opts.Attempts) + assert.Equal(t, uint64(2), opts.Delay) + assert.Equal(t, uint64(1), opts.Timeout) + assert.Equal(t, uint64(1), opts.RetryDelay) } func TestOptions_MergeKeepOriginal(t *testing.T) { @@ -103,8 +103,8 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { }) assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, 10, opts.Attempts) - assert.Equal(t, 10, opts.Delay) - assert.Equal(t, 10, opts.Timeout) - assert.Equal(t, 10, opts.RetryDelay) + assert.Equal(t, uint64(10), opts.Attempts) + assert.Equal(t, uint64(10), opts.Delay) + assert.Equal(t, uint64(10), opts.Timeout) + assert.Equal(t, uint64(10), opts.RetryDelay) } diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go index 92f78081..0aa5b177 100644 --- a/plugins/jobs/structs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -3,6 +3,7 @@ package structs import ( "testing" + "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -13,7 +14,7 @@ func TestJob_Body(t *testing.T) { } func TestJob_Context(t *testing.T) { - j := &Job{Job: "job", Options: &Options{ID: "id"}} + j := &Job{Job: "job", Options: &Options{ID: utils.AsStringPtr("id")}} assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context()) } |