diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/config.go | 62 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/jobs/doc/response_protocol.md | 54 | ||||
-rw-r--r-- | plugins/jobs/job/job.go | 51 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go | 18 | ||||
-rw-r--r-- | plugins/jobs/metrics.go | 92 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 98 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go | 21 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 719 | ||||
-rw-r--r-- | plugins/jobs/protocol.go | 78 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 160 |
11 files changed, 0 insertions, 1354 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go deleted file mode 100644 index 454256b9..00000000 --- a/plugins/jobs/config.go +++ /dev/null @@ -1,62 +0,0 @@ -package jobs - -import ( - "runtime" - - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" -) - -const ( - // name used to set pipeline name - pipelineName string = "name" -) - -// Config defines settings for job broker, workers and job-pipeline mapping. -type Config struct { - // NumPollers configures number of priority queue pollers - // Should be no more than 255 - // Default - num logical cores - NumPollers uint8 `mapstructure:"num_pollers"` - - // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline - // Driver pipeline might be much larger than a main jobs queue - PipelineSize uint64 `mapstructure:"pipeline_size"` - - // Timeout in seconds is the per-push limit to put the job into queue - Timeout int `mapstructure:"timeout"` - - // Pool configures roadrunner workers pool. - Pool *poolImpl.Config `mapstructure:"Pool"` - - // Pipelines defines mapping between PHP job pipeline and associated job broker. - Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"` - - // Consuming specifies names of pipelines to be consumed on service start. - Consume []string `mapstructure:"consume"` -} - -func (c *Config) InitDefaults() { - if c.Pool == nil { - c.Pool = &poolImpl.Config{} - } - - if c.PipelineSize == 0 { - c.PipelineSize = 1_000_000 - } - - if c.NumPollers == 0 { - c.NumPollers = uint8(runtime.NumCPU()) - } - - for k := range c.Pipelines { - // set the pipeline name - c.Pipelines[k].With(pipelineName, k) - } - - if c.Timeout == 0 { - c.Timeout = 60 - } - - c.Pool.InitDefaults() -} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio deleted file mode 100644 index 824e1a83..00000000 --- a/plugins/jobs/doc/jobs_arch.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-08-21T12:35:37.051Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.1 Safari/537.36" etag="L_bYn0v_jW4MOvWLd2St" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc5s4+9d4Jt2ZZJDE+TFXs23Txo2Tttkv72AjxzTYuICTuL/+lYTAIMkOjgE7R3a2iQXI4rkvPeqg4/HjWeROR19DDwcdqHmPHXTSgdBxoEF+0ZF5OmLYppaO3Ea+l46BxUDP/4v5YHbbzPdwXLoxCcMg8aflwUE4meBBUhpzoyh8KN82DIPyt07dWywN9AZuII/+9L1klK2O/Cyu/Iv921EiXRq72f18IB65XvhQGEKnHXQchWGS/jV+PMYBBWAGmvS5j0uu5muL8CSp8kCvG5s3x1/n7udQH4Hbb99O76J9nS/u3g1m/KUvLy6uyMjni6Me+dU9vz779I2/QTLPIBOFs4mH6cxaBx09jPwE96bugF59IMRAxkbJOCCfAPlzGE6Sj+7YDygd9D6Smb6Gk5BccAP/dkLGohSCR4Hbx0E3jP3ED+l4gId0+B5HiU+wci5cTsJp4eohn6wfJkk4pl/rB8FxGIQRWzIaDrE5GJDxOInCO1y44llOX6PvIYOUQ5l+B34sDHEQn+FwjJNoTm7hV5GG4AGne072wKCf6MDDgoigqadjoyL56A4nXk64t/n8C8SSPzhu1Xj+NZk8/gvIc/d/k4l19h3/pw32oYRB7BFS5x/DKBmFt+HEDU4Xo0dlHC/uOQ8p3Blmf+MkmXO+dWdJWMY7gWI0/0WfJxDhH2+K104e+eTppzn/tJReluInDmfRAK96fw6AxI1ucbLiRj4hhc5KdEc4cBP/viwoVGjjj3ZDn6w5JxPHhiUasTQ7o5psknSp/LkiX4tTWVZ5KihNlYJHmuowitx54bYpvSGWaC1/6+eTH9oK+T36SYH6yKebjNzI3wvaox/mRULcDZIFFSlWb4liBTLTnaoUWxcRZbK4oKq6ZCZtn6qpWTwik9JfR24yGG2osMqawzOw7ekqzWHDPjJNjv/CuMZ+nkUX1VWNLWDEMHKMFBVNZpAUFY1uN6RnwDunr8/pelXdZLbF6uaB7ZiGASyoA2Q5JTJDRoOMf/G3i76M0P/unX0TJWc9jZifGeNviaaKFJXTl5qmRIvTHmC1xdm3Dd1YaXHWRjSZ5dM40WiCGQJgRTJZboaov8kyBU1k6fyblplI4hOLtS0oNV1FrQpLlxTWGaarmvpTHPgTXKuSwoCoKUtFbI5pIXdbSsoBArKUSgpAhZJCqCElZUh46UAzoJ7mtIQP88+MOudHY0K3PnEtD8lVbfpI/mWw0dLxfeaG0mt64RoBT7LP/Vt6bUBAiKPyZQ8PwshNnVl6D0V+lBJG9tXkr1v+my2wnw18DvsELIe9JJoNklmEsxsIQPriQ2RsKo6NInHkWe+eeuj0ok0vissmq2RPnLC5I39ym79I/v1ddx6ErrfkPtXam1jpJw8zibF6rRdTiq04vy//rFqswNxinCKNXmQhEB7qCMldw4CFiSh/c+bkSgrAVcy6XDQMDfqfSjSY7EcWDelPPRIg5+15ZpAimf9NBf8DqDckAMxXIAAy0tt5GdDNtd1WGbwb+WHkJ/N8GTNiItAgnMjjJ8QOmi9Z7OKuwyTB42myEAXM4Gj3lS4pM5SXW1hFX8JuW+u68sc4nCXPWdS7zFTKTGRrFWUmakpmZnbcu2e/hpOWeexPu/b2Lrj20Gw9pier4tRHGhDdNhszhbnnReSFow/s7W/9mKhRTM3FYUivJiNccqm0/rz8eeKO34SnZTvllAKCSk/LUkgNszGh4bwLjfWFhl1RaGRKYrtCAzQZD1wJoILQ+IrJN5Kh38zhDHM37cEnhC/IjYJs4DceULTxGcJJQEFGjbu5PGGNYmQ4HMIl+WizbxpbEyNGOWADgTKroKvEiNaQGIFylQLN9OxRTHbgcYomqh+IJCBfT/4/oiCNasWX52J7qMSXObBxf7gjYh9osKrYN8ym8CWn6pYghLrBZahnxj33l+X6jrHveamewLH/1+2zqSiUedyYzGscdYwTOhdRDTH3AyTETcIJbhg3jiNUgtiaAjeGobDjraZQAyXUSEghvhuVj8d5VZUm0zb57yP92qPbyPV8vLiWgbXMPPntShx4bjzKuXJZtU9e17PaKczpRihAEv3BzAahX+nG0/RFh/4jXUdKSjg6vccpRTHqGblT+sD48ZaWux24D7F+MIvZdzVKRMguEZFpqxjcsmUismrI8v41h94Dsj/qd+PDm7PZY9dy/s3KObYSP1MExtwxJYBA/sSQOKTyPRsuP1KOrEmhiTyIdlxwRvpMkH3YUqxNCSopKrMKjuwuOTpG1Wlq6/wTs3hifPA51ak4iqijVXMIqakXueSW3l7Byvsn+/sgi0W+vPfiJPjaXuucIGuPLjvV2fIrvZg3aSHs2hjzu0SNFSkrDXp/eDEvcIlF3lj5BoK5kyn2iNkARbtBjiyrXIalyn+nQ8qGJYeUkcpLAHYNeXilGfEeHFoZHFLCzJBjQ8r7ao8nb4RpueKie937d0P3vEGbG5gCtzi6KgOjKq1ENQRBVmG0AMM/B79pFQixjUcucdWCXYYnEOCJHAU8lVVANcAzvjzpa7/vfv+07r733F/HP82r66wI4AVJH1CUPQVR1Ij0uXSuz2Pjvtezg/9ufsHY/emfq2rSlbBtpiZ93UJB0XMGUNeLZCM9YJho0wdsXSDMzeoKV2GhIAm+HV71NuT+Z+W41GbPkihoDXLEtkV4Z6mrohgBCjGiNyVG7G2KEVAUIk8UJy+MmDbFiBJmWxMjG2H65ZmrrSqMjTCNdkJhAFG8aDpcKf8BgBs/Aax6VYYaD9ruEC94wcS7Sh/vGvEaxmrjRSbF9Z+o295R42G7G4AOrNdCvcr74E5QLwQiLWpP0KK2+RPIaYN6JWu9UIr8/fr0+rRWy910NW04VFnuyNK042PZQB+yn6YjAJLkUFnuqgBAY5b7DhW0VhQqrYYfVwmLF2a5Q4kFv55+vbi8qZXxXorLjKxtu8zG7jDeakeqAQZC22KgeiJomrVax+qSP9SGdyPXoxx+/d59k9wNjG1zt7zruYGOUkJppq1pFlIh59DQNF1RvtaKzYP0KjaPahtPY8gBsir8NJ4ScUqRMRhhxVaJV16iKeSlFFlxkHU2qLtAU40hWZgRcAS0NoG8ahTe+x6O5Er0140lwxQKaXP4F/GEzGbwpOycJ7t1gG4jWODKZXNwoVdA3LIyIc+/F4fiqTvJxiCbPZ3008RPfILfv+mGBWonEFhSePmMmdNtDvwr6VImdCDG7Bevbe3wavl0/1T3e6FUpvi1hWHFAp9YM6Jr7rKdoH6+ZK8MjuIqxu60k+/ZuMPzOEUcu+zTFdO9XDGbLX9mEE6G/i35Y88d/5mmuwH62J3EiRvcpR9xMviQQ4EhjM6xz7CVF3YWa0XXAURlPhSaGbbNhYIC/Mh+avLtpcijaUrc6TgKKarXsPFMyZ2ymisbIb3Tyx+nb1XnEZND0Hq2QppaqE1puuUQ7+bBmEX4pYlgjBJoCl9SeR9HWvvBmFWrLrAmZ8aNHIKnWszmfFtjj651JKSwqw4ACGWWU9Y/NcVyO1T/9GI4TtF0bRWN7wjHyR75LmQg2rdSRB7UkFONB+twx1dJ5iJmLhRxq1E47s/itRFBQxwDTYWI4fCpnamS3KwFA0LHUuLGyVLQbhMBclEtefv15WJ1cz/fxUicsUPaop7OOcWTdITLTft5OHtu8vn58tCqWR5yQsmc6w37FkDNNMRe7FJXwZqaUkJN1+TvgsUpKz0DdCAQ9WbxciU6HInq51gOKdVJ9uke3SZpfk3joHmaz4LuLRM9cIApEhVCwjT1NehQv/q7bbk+XWX08jRhGTtlXWbrLkiTH4fnG9uUlQ2TFU3Zl7Rxb8SYRHrFepY6OrSpESEHvbohDagSWcu7TFKaSGYRa40zDuM0pZC7AumDbywWZtjSeS3AksNhCKjOa2ksHCYHSX66fvJGcQQ1A1TBkd0qimSv+keKG81PUxQeJjaOx5IuNAshZ378IV0/61GHWVrk8PhLNjKgmQfN9egwQ/wTiSPFZuzle9SLeY1C+iVf/ytPhdQhNhxR/kNLznlAoOjK0ljOw4DvVtf6VpdiQ7U6iFb74RubiR95S/XPi8svp5esxObi4rwt80sRc7KVvhwykYO8+swvUSPo2aE2TxpgRlM4kcN5p79Oj6+vaKZx78+BF7JWJ8Xk9SyKOixrH3x4Y0odOFDyU3VN0WmwqfZoahTKAcET1n2WqvQeju5Zc5G3mDSGNionjYEGsnNXSixnKHRec/jaoU2Zu1jEry6OqBppQM1UIUsxKwRMXTLwnWqB2rpiVooeoynnd/Jqke0o1FYS1AhoMgZUB3W0mqLOKfrFcvcWLFoIK3J3/e2jN0O2Inwlpal548ZJP56qXM6N0taaSw+rUjAaPLHM1VmIZdXlDXGmBhVVyq0mrqGcQnrDASoEbME2yqv6txWeQnIEsevGMQszsRMSaJfsvBT4bWELSruaEFBasqBNhO1Qzqxie5Ht6zpUuSBrt3JmSA4e006omEUGsh726bEn+7Q3tT/0B2TkzwwrZGtDdqjn04aYae/tBxwnssas0BG/KdPUURQsqxRgHUcSqMOBbzfYuv7p2BsweNXwbFbKtysMruh4efnp4vLT1c3CmNUabIL5lFtZ0dpt0ahFJqxo1NZwbIUaaXL89hIPMCEWj4tkbtyme40OTvC9n+67Uuf32PlUny9oH+O9MR6H0fxDeiu74E4oQAbhhIKNGUwNJ/kIr7Ep8tMi09f58J7lW06mlriD11Zk+VRdkxvL8iE5Qk2b93eylEJGonwXXaG8o5DpnT5V//3qPQDBX0OWnDxCdqv2v+xRs1pwbRiF405xn2aj+oIzraAt+sDzhspKcqBZyMHNaQXDggew4oHhunQQWX3m3vIjTyhIM+n4Oe3H/OCzGfsMYxfd7ulJp5T243sRp8GMdstfyNniVG+NJfMCqxzztsySMDvsqhWW1GUvrfrxB+kpBtri320cXPMpwVR27BXkPfMhGzu+Rn1eQ5qk0FaCJx2RjmU/YSnz5x4ZXfdysoA0W1T5AOntLeoo9OaFM1T68+QZp6bUvahjIivIlGxde8WFsQ359IyaZxwvUvciDwd3exucmVP3cr65T69HUAqNnV3yZFpyq4eZIFOw35SHHkJNqS0aUxfyWcYckaLF9mr38xlIaa21uqVPl7349z19neohN11xZvnF3y76MkL/u3f2TZSc9bTz67PKbQXq3tQntGwCRlb4WvuWPtFLzL5p+YY+8QmTxwsa3c6nyyGB9/18r4XegQOEbKUlOjw10TtwgCDOTfBER2VpbaZlNE/vWV622O8xZpaTWB4iOkjprgrml0/CfbY5asiCsXFICzp5yu2tdVcDpihSTVUhLlQU4oobS+vbzCD73pl5/I7UKkilm8sF7eUoYtftIlUR4xQiE/Rrg1vZXts+supgNMsUdw1BpKo8UTVsrSPM9Rg/XM1+3P9B5871zUVXsz7/HK84zXlZ0mm9E1GLSSXWIzE7o7fIm9rUn/rs9M64s14+qqmlpg0X0waB6frW3Q/X1MpYW8VewiTdIF/gnhcyCUj1s4cDnKyd2WtqvTprA8ngV1jvIia5E4s0skXyE2QLIdM9Dw/dWUABSxmTRrDYue8UzkNyt7crgDYPlplAded2X5DQhUAXy2eRrYoWGQqh69QQpVA6E1ASuqdprEhju4JpxEKLZ4MRlzwpUb5G9IgqUdcV+lBXoAY2hRpVA+a8TSztGsv2d7vMtUsVGasPkQ3TwnnUPBu/KOtbA5nt1FHUYd5oQjwQGaqwrLLjeR3HvCqxuUkOr+oR5qLE5uxXuuzhQci7cLB7aNI+4ifcLztKPk875YTUTNbuiXdeHM1uq9IWtGuzccRTdMbJIndpHXWsk0ppDHV4SWAAOaNB/ScebgJwlT+1Iqa+U4eyL8tjKFrxLlpi1M4ymaex+2eaLjtZYxFF7NQfNVfCTFGnqoZt7QezLwtFW2XSQqZxoCFdt6Fl2CbUqsUQnxGyWwWeghTmjbipamUViaEiQrNWaVMjGS5H6JSNUMVapDoKz5WQlJOMpW3p7uu0EoGJxF4BKknptGknynnGE7Vpty7h1lhgv0lHM9MC1Wi9Mdsta3Ky+4pomcKBVQ7+XOtsqCc1kV1RE21tR9SqVRf94SntshG5wbr8tAykjSgI8SgFqHZ51GzTnAWnv3TGaZ9vnBfJN3JGRXn03S6zDCKuTNUSb2g1xzNb3YBbB89Y7TONqmntC+AaRc/ao9PDb72rw/MvL4p3dKMq4zTGNrLm5qeU0c0QuXu3y1Clbl5ZIunaQeXdiPaBtT5syccopFG0hZNNQDD6GnqY3vF/</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/doc/response_protocol.md b/plugins/jobs/doc/response_protocol.md deleted file mode 100644 index e195c407..00000000 --- a/plugins/jobs/doc/response_protocol.md +++ /dev/null @@ -1,54 +0,0 @@ -Response protocol used to communicate between worker and RR. When a worker completes its job, it should send a typed -response. The response should contain: - -1. `type` field with the message type. Can be treated as enums. -2. `data` field with the dynamic response related to the type. - -Types are: - -``` -0 - NO_ERROR -1 - ERROR -2 - ... -``` - -- `NO_ERROR`: contains only `type` and empty `data`. -- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the - job, - `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap - with string key and array of strings as a value. - -For example: - -`NO_ERROR`: -For example: - -```json -{ - "type": 0, - "data": {} -} - -``` - -`ERROR`: - -```json -{ - "type": 1, - "data": { - "message": "internal worker error", - "requeue": true, - "headers": [ - { - "test": [ - "1", - "2", - "3" - ] - } - ], - "delay_seconds": 10 - } -} -``` diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go deleted file mode 100644 index adab2a0a..00000000 --- a/plugins/jobs/job/job.go +++ /dev/null @@ -1,51 +0,0 @@ -package job - -import ( - "time" -) - -// constant keys to pack/unpack messages from different drivers -const ( - RRID string = "rr_id" - RRJob string = "rr_job" - RRHeaders string = "rr_headers" - RRPipeline string = "rr_pipeline" - RRDelay string = "rr_delay" - RRPriority string = "rr_priority" -) - -// Job carries information about single job. -type Job struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-value pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go deleted file mode 100644 index 4a95e27d..00000000 --- a/plugins/jobs/job/job_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package job - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestOptions_DelayDuration(t *testing.T) { - opts := &Options{Delay: 0} - assert.Equal(t, time.Duration(0), opts.DelayDuration()) -} - -func TestOptions_DelayDuration2(t *testing.T) { - opts := &Options{Delay: 1} - assert.Equal(t, time.Second, opts.DelayDuration()) -} diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go deleted file mode 100644 index 38d0bcfb..00000000 --- a/plugins/jobs/metrics.go +++ /dev/null @@ -1,92 +0,0 @@ -package jobs - -import ( - "sync/atomic" - - "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/plugins/informer" -) - -func (p *Plugin) MetricsCollector() []prometheus.Collector { - // p - implements Exporter interface (workers) - // other - request duration and count - return []prometheus.Collector{p.statsExporter} -} - -const ( - namespace = "rr_jobs" -) - -type statsExporter struct { - workers informer.Informer - workersMemory uint64 - jobsOk uint64 - pushOk uint64 - jobsErr uint64 - pushErr uint64 -} - -var ( - worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) - pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) - pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) - jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) - jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) -) - -func newStatsExporter(stats informer.Informer) *statsExporter { - return &statsExporter{ - workers: stats, - workersMemory: 0, - jobsOk: 0, - pushOk: 0, - jobsErr: 0, - pushErr: 0, - } -} - -func (se *statsExporter) metricsCallback(event interface{}) { - if jev, ok := event.(events.JobEvent); ok { - switch jev.Event { //nolint:exhaustive - case events.EventJobOK: - atomic.AddUint64(&se.jobsOk, 1) - case events.EventPushOK: - atomic.AddUint64(&se.pushOk, 1) - case events.EventPushError: - atomic.AddUint64(&se.pushErr, 1) - case events.EventJobError: - atomic.AddUint64(&se.jobsErr, 1) - } - } -} - -func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { - // send description - d <- worker - d <- pushErr - d <- pushOk - d <- jobsErr - d <- jobsOk -} - -func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { - // get the copy of the processes - workers := se.workers.Workers() - - // cumulative RSS memory in bytes - var cum uint64 - - // collect the memory - for i := 0; i < len(workers); i++ { - cum += workers[i].MemoryUsage - } - - // send the values to the prometheus - ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) - // send the values to the prometheus - ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) - ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) - ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) - ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) -} diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go deleted file mode 100644 index 8a8c1462..00000000 --- a/plugins/jobs/pipeline/pipeline.go +++ /dev/null @@ -1,98 +0,0 @@ -package pipeline - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/utils" -) - -// Pipeline defines pipeline options. -type Pipeline map[string]interface{} - -const ( - priority string = "priority" - driver string = "driver" - name string = "name" -) - -// With pipeline value -func (p *Pipeline) With(name string, value interface{}) { - (*p)[name] = value -} - -// Name returns pipeline name. -func (p Pipeline) Name() string { - return p.String(name, "") -} - -// Driver associated with the pipeline. -func (p Pipeline) Driver() string { - return p.String(driver, "") -} - -// Has checks if value presented in pipeline. -func (p Pipeline) Has(name string) bool { - if _, ok := p[name]; ok { - return true - } - - return false -} - -// String must return option value as string or return default value. -func (p Pipeline) String(name string, d string) string { - if value, ok := p[name]; ok { - if str, ok := value.(string); ok { - return str - } - } - - return d -} - -// Int must return option value as string or return default value. -func (p Pipeline) Int(name string, d int) int { - if value, ok := p[name]; ok { - if i, ok := value.(int); ok { - return i - } - } - - return d -} - -// Bool must return option value as bool or return default value. -func (p Pipeline) Bool(name string, d bool) bool { - if value, ok := p[name]; ok { - if i, ok := value.(bool); ok { - return i - } - } - - return d -} - -// Map must return nested map value or empty config. -// Here might be sqs attributes or tags for example -func (p Pipeline) Map(name string, out map[string]string) error { - if value, ok := p[name]; ok { - if m, ok := value.(string); ok { - err := json.Unmarshal(utils.AsBytes(m), &out) - if err != nil { - return err - } - } - } - - return nil -} - -// Priority returns default pipeline priority -func (p Pipeline) Priority() int64 { - if value, ok := p[priority]; ok { - if v, ok := value.(int64); ok { - return v - } - } - - return 10 -} diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go deleted file mode 100644 index 4482c70d..00000000 --- a/plugins/jobs/pipeline/pipeline_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package pipeline - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestPipeline_String(t *testing.T) { - pipe := Pipeline{"value": "value"} - - assert.Equal(t, "value", pipe.String("value", "")) - assert.Equal(t, "value", pipe.String("other", "value")) -} - -func TestPipeline_Has(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} - - assert.Equal(t, true, pipe.Has("options")) - assert.Equal(t, false, pipe.Has("other")) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go deleted file mode 100644 index 3aec6acc..00000000 --- a/plugins/jobs/plugin.go +++ /dev/null @@ -1,719 +0,0 @@ -package jobs - -import ( - "context" - "fmt" - "sync" - "time" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" -) - -const ( - // RrMode env variable - RrMode string = "RR_MODE" - RrModeJobs string = "jobs" - - PluginName string = "jobs" - pipelines string = "pipelines" -) - -type Plugin struct { - sync.RWMutex - - // Jobs plugin configuration - cfg *Config `structure:"jobs"` - log logger.Logger - workersPool pool.Pool - server server.Server - - jobConstructors map[string]jobs.Constructor - consumers sync.Map // map[string]jobs.Consumer - - // events handler - events events.Handler - - // priority queue implementation - queue priorityqueue.Queue - - // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline - pipelines sync.Map - - // initial set of the pipelines to consume - consume map[string]struct{} - - // signal channel to stop the pollers - stopCh chan struct{} - - // internal payloads pool - pldPool sync.Pool - statsExporter *statsExporter -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { - const op = errors.Op("jobs_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - - p.cfg.InitDefaults() - - p.server = server - - p.events = events.NewEventsHandler() - p.events.AddListener(p.collectJobsEvents) - - p.jobConstructors = make(map[string]jobs.Constructor) - p.consume = make(map[string]struct{}) - p.stopCh = make(chan struct{}, 1) - - p.pldPool = sync.Pool{New: func() interface{} { - // with nil fields - return &payload.Payload{} - }} - - // initial set of pipelines - for i := range p.cfg.Pipelines { - p.pipelines.Store(i, p.cfg.Pipelines[i]) - } - - if len(p.cfg.Consume) > 0 { - for i := 0; i < len(p.cfg.Consume); i++ { - p.consume[p.cfg.Consume[i]] = struct{}{} - } - } - - // initialize priority queue - p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) - p.log = log - - // metrics - p.statsExporter = newStatsExporter(p) - p.events.AddListener(p.statsExporter.metricsCallback) - - return nil -} - -func (p *Plugin) Serve() chan error { //nolint:gocognit - errCh := make(chan error, 1) - const op = errors.Op("jobs_plugin_serve") - - // register initial pipelines - p.pipelines.Range(func(key, value interface{}) bool { - t := time.Now() - // pipeline name (ie test-local, sqs-aws, etc) - name := key.(string) - - // pipeline associated with the name - pipe := value.(*pipeline.Pipeline) - // driver for the pipeline (ie amqp, ephemeral, etc) - dr := pipe.Driver() - - // jobConstructors contains constructors for the drivers - // we need here to initialize these drivers for the pipelines - if _, ok := p.jobConstructors[dr]; ok { - // config key for the particular sub-driver jobs.pipelines.test-local - configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) - - // init the driver - initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) - if err != nil { - errCh <- errors.E(op, err) - return false - } - - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(name, initializedDriver) - - // register pipeline for the initialized driver - err = initializedDriver.Register(context.Background(), pipe) - if err != nil { - errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) - return false - } - - // if pipeline initialized to be consumed, call Run on it - if _, ok := p.consume[name]; ok { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - err = initializedDriver.Run(ctx, pipe) - if err != nil { - errCh <- errors.E(op, err) - return false - } - return true - } - - return true - } - - p.events.Push(events.JobEvent{ - Event: events.EventDriverReady, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: t, - Elapsed: t.Sub(t), - }) - - return true - }) - - // do not continue processing, immediately stop if channel contains an error - if len(errCh) > 0 { - return errCh - } - - var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) - if err != nil { - errCh <- err - return errCh - } - - // start listening - go func() { - for i := uint8(0); i < p.cfg.NumPollers; i++ { - go func() { - for { - select { - case <-p.stopCh: - p.log.Info("------> job poller stopped <------") - return - default: - // get prioritized JOB from the queue - jb := p.queue.ExtractMin() - - // parse the context - // for each job, context contains: - /* - 1. Job class - 2. Job ID provided from the outside - 3. Job Headers map[string][]string - 4. Timeout in seconds - 5. Pipeline name - */ - - start := time.Now() - p.events.Push(events.JobEvent{ - Event: events.EventJobStart, - ID: jb.ID(), - Start: start, - Elapsed: 0, - }) - - ctx, err := jb.Context() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - Error: err, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) - } - p.log.Error("job marshal context", "error", err) - continue - } - - // get payload from the sync.Pool - exec := p.getPayload(jb.Body(), ctx) - - // protect from the pool reset - p.RLock() - resp, err := p.workersPool.Exec(exec) - p.RUnlock() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - // RR protocol level error, Nack the job - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) - } - - p.log.Error("job execute failed", "error", err) - - p.putPayload(exec) - continue - } - - // if response is nil or body is nil, just acknowledge the job - if resp == nil || resp.Body == nil { - p.putPayload(exec) - err = jb.Ack() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - p.log.Error("acknowledge error, job might be missed", "error", err) - continue - } - - p.events.Push(events.JobEvent{ - Event: events.EventJobOK, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - continue - } - - // handle the response protocol - err = handleResponse(resp.Body, jb, p.log) - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Start: start, - Error: err, - Elapsed: time.Since(start), - }) - p.putPayload(exec) - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed, job might be lost", "root error", err, "error nack", errNack) - continue - } - - p.log.Error("job negatively acknowledged", "error", err) - continue - } - - p.events.Push(events.JobEvent{ - Event: events.EventJobOK, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - // return payload - p.putPayload(exec) - } - } - }() - } - }() - - return errCh -} - -func (p *Plugin) Stop() error { - // range over all consumers and call stop - p.consumers.Range(func(key, value interface{}) bool { - consumer := value.(jobs.Consumer) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := consumer.Stop(ctx) - if err != nil { - cancel() - p.log.Error("stop job driver", "driver", key) - return true - } - cancel() - return true - }) - - // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, - // but if not, this is not a problem at all. - // The main target is to stop the drivers - go func() { - for i := uint8(0); i < p.cfg.NumPollers; i++ { - // stop jobs plugin pollers - p.stopCh <- struct{}{} - } - }() - - // just wait pollers for 5 seconds before exit - time.Sleep(time.Second * 5) - - p.Lock() - p.workersPool.Destroy(context.Background()) - p.Unlock() - - return nil -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectMQBrokers, - } -} - -func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { - p.jobConstructors[name.Name()] = c -} - -func (p *Plugin) Workers() []*process.State { - p.RLock() - wrk := p.workersPool.Workers() - p.RUnlock() - - ps := make([]*process.State, len(wrk)) - - for i := 0; i < len(wrk); i++ { - st, err := process.WorkerProcessState(wrk[i]) - if err != nil { - p.log.Error("jobs workers state", "error", err) - return nil - } - - ps[i] = st - } - - return ps -} - -func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { - const op = errors.Op("jobs_plugin_drivers_state") - jst := make([]*jobState.State, 0, 2) - var err error - p.consumers.Range(func(key, value interface{}) bool { - consumer := value.(jobs.Consumer) - newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) - - var state *jobState.State - state, err = consumer.State(newCtx) - if err != nil { - cancel() - return false - } - - jst = append(jst, state) - cancel() - return true - }) - - if err != nil { - return nil, errors.E(op, err) - } - return jst, nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Reset() error { - p.Lock() - defer p.Unlock() - - const op = errors.Op("jobs_plugin_reset") - p.log.Info("JOBS plugin received restart request. Restarting...") - p.workersPool.Destroy(context.Background()) - p.workersPool = nil - - var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) - if err != nil { - return errors.E(op, err) - } - - p.log.Info("JOBS workers pool successfully restarted") - - return nil -} - -func (p *Plugin) Push(j *job.Job) error { - const op = errors.Op("jobs_plugin_push") - - start := time.Now() - // get the pipeline for the job - pipe, ok := p.pipelines.Load(j.Options.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) - } - - // type conversion - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // if job has no priority, inherit it from the pipeline - if j.Options.Priority == 0 { - j.Options.Priority = ppl.Priority() - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - - err := d.(jobs.Consumer).Push(ctx, j) - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventPushError, - ID: j.Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - return errors.E(op, err) - } - - p.events.Push(events.JobEvent{ - Event: events.EventPushOK, - ID: j.Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (p *Plugin) PushBatch(j []*job.Job) error { - const op = errors.Op("jobs_plugin_push") - start := time.Now() - - for i := 0; i < len(j); i++ { - // get the pipeline for the job - pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // if job has no priority, inherit it from the pipeline - if j[i].Options.Priority == 0 { - j[i].Options.Priority = ppl.Priority() - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.(jobs.Consumer).Push(ctx, j[i]) - if err != nil { - cancel() - p.events.Push(events.JobEvent{ - Event: events.EventPushError, - ID: j[i].Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Start: start, - Elapsed: time.Since(start), - Error: err, - }) - return errors.E(op, err) - } - - cancel() - } - - return nil -} - -func (p *Plugin) Pause(pp string) { - pipe, ok := p.pipelines.Load(pp) - - if !ok { - p.log.Error("no such pipeline", "requested", pp) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pp) - return - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - // redirect call to the underlying driver - d.(jobs.Consumer).Pause(ctx, ppl.Name()) -} - -func (p *Plugin) Resume(pp string) { - pipe, ok := p.pipelines.Load(pp) - if !ok { - p.log.Error("no such pipeline", "requested", pp) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pp) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - // redirect call to the underlying driver - d.(jobs.Consumer).Resume(ctx, ppl.Name()) -} - -// Declare a pipeline. -func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { - const op = errors.Op("jobs_plugin_declare") - // driver for the pipeline (ie amqp, ephemeral, etc) - dr := pipeline.Driver() - if dr == "" { - return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name())) - } - - // jobConstructors contains constructors for the drivers - // we need here to initialize these drivers for the pipelines - if _, ok := p.jobConstructors[dr]; ok { - // init the driver from pipeline - initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) - if err != nil { - return errors.E(op, err) - } - - // register pipeline for the initialized driver - err = initializedDriver.Register(context.Background(), pipeline) - if err != nil { - return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) - } - - // if pipeline initialized to be consumed, call Run on it - // but likely for the dynamic pipelines it should be started manually - if _, ok := p.consume[pipeline.Name()]; ok { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - err = initializedDriver.Run(ctx, pipeline) - if err != nil { - return errors.E(op, err) - } - } - - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(pipeline.Name(), initializedDriver) - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) - } - - return nil -} - -// Destroy pipeline and release all associated resources. -func (p *Plugin) Destroy(pp string) error { - const op = errors.Op("jobs_plugin_destroy") - pipe, ok := p.pipelines.Load(pp) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp)) - } - - // type conversion - ppl := pipe.(*pipeline.Pipeline) - - // delete consumer - d, ok := p.consumers.LoadAndDelete(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // delete old pipeline - p.pipelines.LoadAndDelete(pp) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.(jobs.Consumer).Stop(ctx) - if err != nil { - cancel() - return errors.E(op, err) - } - - cancel() - return nil -} - -func (p *Plugin) List() []string { - out := make([]string, 0, 10) - - p.pipelines.Range(func(key, _ interface{}) bool { - // we can safely convert value here as we know that we store keys as strings - out = append(out, key.(string)) - return true - }) - - return out -} - -func (p *Plugin) RPC() interface{} { - return &rpc{ - log: p.log, - p: p, - } -} - -func (p *Plugin) collectJobsEvents(event interface{}) { - if jev, ok := event.(events.JobEvent); ok { - switch jev.Event { - case events.EventPipePaused: - p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobStart: - p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobOK: - p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPushOK: - p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPushError: - p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobError: - p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeActive: - p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeStopped: - p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeError: - p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventDriverReady: - p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - } - } -} - -func (p *Plugin) getPayload(body, context []byte) *payload.Payload { - pld := p.pldPool.Get().(*payload.Payload) - pld.Body = body - pld.Context = context - return pld -} - -func (p *Plugin) putPayload(pld *payload.Payload) { - pld.Body = nil - pld.Context = nil - p.pldPool.Put(pld) -} diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go deleted file mode 100644 index 9d769fdf..00000000 --- a/plugins/jobs/protocol.go +++ /dev/null @@ -1,78 +0,0 @@ -package jobs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/errors" - pq "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type Type uint32 - -const ( - NoError Type = iota - Error -) - -// internal worker protocol (jobs mode) -type protocol struct { - // message type, see Type - T Type `json:"type"` - // Payload - Data json.RawMessage `json:"data"` -} - -type errorResp struct { - Msg string `json:"message"` - Requeue bool `json:"requeue"` - Delay int64 `json:"delay_seconds"` - Headers map[string][]string `json:"headers"` -} - -func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { - const op = errors.Op("jobs_handle_response") - // TODO(rustatian) to sync.Pool - p := &protocol{} - - err := json.Unmarshal(resp, p) - if err != nil { - return errors.E(op, err) - } - - switch p.T { - // likely case - case NoError: - err = jb.Ack() - if err != nil { - return errors.E(op, err) - } - case Error: - // TODO(rustatian) to sync.Pool - er := &errorResp{} - - err = json.Unmarshal(p.Data, er) - if err != nil { - return errors.E(op, err) - } - - log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) - - if er.Requeue { - err = jb.Requeue(er.Headers, er.Delay) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) - - default: - err = jb.Ack() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go deleted file mode 100644 index d7b93bd1..00000000 --- a/plugins/jobs/rpc.go +++ /dev/null @@ -1,160 +0,0 @@ -package jobs - -import ( - "context" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" -) - -type rpc struct { - log logger.Logger - p *Plugin -} - -func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rpc_push") - - // convert transport entity into domain - // how we can do this quickly - - if j.GetJob().GetId() == "" { - return errors.E(op, errors.Str("empty ID field not allowed")) - } - - err := r.p.Push(from(j.GetJob())) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rpc_push_batch") - - l := len(j.GetJobs()) - - batch := make([]*job.Job, l) - - for i := 0; i < l; i++ { - // convert transport entity into domain - // how we can do this quickly - batch[i] = from(j.GetJobs()[i]) - } - - err := r.p.PushBatch(batch) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { - for i := 0; i < len(req.GetPipelines()); i++ { - r.p.Pause(req.GetPipelines()[i]) - } - - return nil -} - -func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { - for i := 0; i < len(req.GetPipelines()); i++ { - r.p.Resume(req.GetPipelines()[i]) - } - - return nil -} - -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { - resp.Pipelines = r.p.List() - return nil -} - -// Declare pipeline used to dynamically declare any type of the pipeline -// Mandatory fields: -// 1. Driver -// 2. Pipeline name -// 3. Options related to the particular pipeline -func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rpc_declare_pipeline") - pipe := &pipeline.Pipeline{} - - for i := range req.GetPipeline() { - (*pipe)[i] = req.GetPipeline()[i] - } - - err := r.p.Declare(pipe) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rpc_declare_pipeline") - - var destroyed []string //nolint:prealloc - for i := 0; i < len(req.GetPipelines()); i++ { - err := r.p.Destroy(req.GetPipelines()[i]) - if err != nil { - return errors.E(op, err) - } - destroyed = append(destroyed, req.GetPipelines()[i]) - } - - // return destroyed pipelines - resp.Pipelines = destroyed - - return nil -} - -func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { - const op = errors.Op("rpc_stats") - state, err := r.p.JobsState(context.Background()) - if err != nil { - return errors.E(op, err) - } - - for i := 0; i < len(state); i++ { - resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ - Pipeline: state[i].Pipeline, - Driver: state[i].Driver, - Queue: state[i].Queue, - Active: state[i].Active, - Delayed: state[i].Delayed, - Reserved: state[i].Reserved, - Ready: state[i].Ready, - }) - } - - return nil -} - -// from converts from transport entity to domain -func from(j *jobsv1beta.Job) *job.Job { - headers := make(map[string][]string, len(j.GetHeaders())) - - for k, v := range j.GetHeaders() { - headers[k] = v.GetValue() - } - - jb := &job.Job{ - Job: j.GetJob(), - Headers: headers, - Ident: j.GetId(), - Payload: j.GetPayload(), - Options: &job.Options{ - Priority: j.GetOptions().GetPriority(), - Pipeline: j.GetOptions().GetPipeline(), - Delay: j.GetOptions().GetDelay(), - }, - } - - return jb -} |