summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/config.go62
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/doc/response_protocol.md54
-rw-r--r--plugins/jobs/job/job.go51
-rw-r--r--plugins/jobs/job/job_test.go18
-rw-r--r--plugins/jobs/metrics.go92
-rw-r--r--plugins/jobs/pipeline/pipeline.go98
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go21
-rw-r--r--plugins/jobs/plugin.go719
-rw-r--r--plugins/jobs/protocol.go78
-rw-r--r--plugins/jobs/rpc.go160
11 files changed, 0 insertions, 1354 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
deleted file mode 100644
index 454256b9..00000000
--- a/plugins/jobs/config.go
+++ /dev/null
@@ -1,62 +0,0 @@
-package jobs
-
-import (
- "runtime"
-
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
-)
-
-const (
- // name used to set pipeline name
- pipelineName string = "name"
-)
-
-// Config defines settings for job broker, workers and job-pipeline mapping.
-type Config struct {
- // NumPollers configures number of priority queue pollers
- // Should be no more than 255
- // Default - num logical cores
- NumPollers uint8 `mapstructure:"num_pollers"`
-
- // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
- // Driver pipeline might be much larger than a main jobs queue
- PipelineSize uint64 `mapstructure:"pipeline_size"`
-
- // Timeout in seconds is the per-push limit to put the job into queue
- Timeout int `mapstructure:"timeout"`
-
- // Pool configures roadrunner workers pool.
- Pool *poolImpl.Config `mapstructure:"Pool"`
-
- // Pipelines defines mapping between PHP job pipeline and associated job broker.
- Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"`
-
- // Consuming specifies names of pipelines to be consumed on service start.
- Consume []string `mapstructure:"consume"`
-}
-
-func (c *Config) InitDefaults() {
- if c.Pool == nil {
- c.Pool = &poolImpl.Config{}
- }
-
- if c.PipelineSize == 0 {
- c.PipelineSize = 1_000_000
- }
-
- if c.NumPollers == 0 {
- c.NumPollers = uint8(runtime.NumCPU())
- }
-
- for k := range c.Pipelines {
- // set the pipeline name
- c.Pipelines[k].With(pipelineName, k)
- }
-
- if c.Timeout == 0 {
- c.Timeout = 60
- }
-
- c.Pool.InitDefaults()
-}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
deleted file mode 100644
index 824e1a83..00000000
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-08-21T12:35:37.051Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.1 Safari/537.36" etag="L_bYn0v_jW4MOvWLd2St" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc5s4+9d4Jt2ZZJDE+TFXs23Txo2Tttkv72AjxzTYuICTuL/+lYTAIMkOjgE7R3a2iQXI4rkvPeqg4/HjWeROR19DDwcdqHmPHXTSgdBxoEF+0ZF5OmLYppaO3Ea+l46BxUDP/4v5YHbbzPdwXLoxCcMg8aflwUE4meBBUhpzoyh8KN82DIPyt07dWywN9AZuII/+9L1klK2O/Cyu/Iv921EiXRq72f18IB65XvhQGEKnHXQchWGS/jV+PMYBBWAGmvS5j0uu5muL8CSp8kCvG5s3x1/n7udQH4Hbb99O76J9nS/u3g1m/KUvLy6uyMjni6Me+dU9vz779I2/QTLPIBOFs4mH6cxaBx09jPwE96bugF59IMRAxkbJOCCfAPlzGE6Sj+7YDygd9D6Smb6Gk5BccAP/dkLGohSCR4Hbx0E3jP3ED+l4gId0+B5HiU+wci5cTsJp4eohn6wfJkk4pl/rB8FxGIQRWzIaDrE5GJDxOInCO1y44llOX6PvIYOUQ5l+B34sDHEQn+FwjJNoTm7hV5GG4AGne072wKCf6MDDgoigqadjoyL56A4nXk64t/n8C8SSPzhu1Xj+NZk8/gvIc/d/k4l19h3/pw32oYRB7BFS5x/DKBmFt+HEDU4Xo0dlHC/uOQ8p3Blmf+MkmXO+dWdJWMY7gWI0/0WfJxDhH2+K104e+eTppzn/tJReluInDmfRAK96fw6AxI1ucbLiRj4hhc5KdEc4cBP/viwoVGjjj3ZDn6w5JxPHhiUasTQ7o5psknSp/LkiX4tTWVZ5KihNlYJHmuowitx54bYpvSGWaC1/6+eTH9oK+T36SYH6yKebjNzI3wvaox/mRULcDZIFFSlWb4liBTLTnaoUWxcRZbK4oKq6ZCZtn6qpWTwik9JfR24yGG2osMqawzOw7ekqzWHDPjJNjv/CuMZ+nkUX1VWNLWDEMHKMFBVNZpAUFY1uN6RnwDunr8/pelXdZLbF6uaB7ZiGASyoA2Q5JTJDRoOMf/G3i76M0P/unX0TJWc9jZifGeNviaaKFJXTl5qmRIvTHmC1xdm3Dd1YaXHWRjSZ5dM40WiCGQJgRTJZboaov8kyBU1k6fyblplI4hOLtS0oNV1FrQpLlxTWGaarmvpTHPgTXKuSwoCoKUtFbI5pIXdbSsoBArKUSgpAhZJCqCElZUh46UAzoJ7mtIQP88+MOudHY0K3PnEtD8lVbfpI/mWw0dLxfeaG0mt64RoBT7LP/Vt6bUBAiKPyZQ8PwshNnVl6D0V+lBJG9tXkr1v+my2wnw18DvsELIe9JJoNklmEsxsIQPriQ2RsKo6NInHkWe+eeuj0ok0vissmq2RPnLC5I39ym79I/v1ddx6ErrfkPtXam1jpJw8zibF6rRdTiq04vy//rFqswNxinCKNXmQhEB7qCMldw4CFiSh/c+bkSgrAVcy6XDQMDfqfSjSY7EcWDelPPRIg5+15ZpAimf9NBf8DqDckAMxXIAAy0tt5GdDNtd1WGbwb+WHkJ/N8GTNiItAgnMjjJ8QOmi9Z7OKuwyTB42myEAXM4Gj3lS4pM5SXW1hFX8JuW+u68sc4nCXPWdS7zFTKTGRrFWUmakpmZnbcu2e/hpOWeexPu/b2Lrj20Gw9pier4tRHGhDdNhszhbnnReSFow/s7W/9mKhRTM3FYUivJiNccqm0/rz8eeKO34SnZTvllAKCSk/LUkgNszGh4bwLjfWFhl1RaGRKYrtCAzQZD1wJoILQ+IrJN5Kh38zhDHM37cEnhC/IjYJs4DceULTxGcJJQEFGjbu5PGGNYmQ4HMIl+WizbxpbEyNGOWADgTKroKvEiNaQGIFylQLN9OxRTHbgcYomqh+IJCBfT/4/oiCNasWX52J7qMSXObBxf7gjYh9osKrYN8ym8CWn6pYghLrBZahnxj33l+X6jrHveamewLH/1+2zqSiUedyYzGscdYwTOhdRDTH3AyTETcIJbhg3jiNUgtiaAjeGobDjraZQAyXUSEghvhuVj8d5VZUm0zb57yP92qPbyPV8vLiWgbXMPPntShx4bjzKuXJZtU9e17PaKczpRihAEv3BzAahX+nG0/RFh/4jXUdKSjg6vccpRTHqGblT+sD48ZaWux24D7F+MIvZdzVKRMguEZFpqxjcsmUismrI8v41h94Dsj/qd+PDm7PZY9dy/s3KObYSP1MExtwxJYBA/sSQOKTyPRsuP1KOrEmhiTyIdlxwRvpMkH3YUqxNCSopKrMKjuwuOTpG1Wlq6/wTs3hifPA51ak4iqijVXMIqakXueSW3l7Byvsn+/sgi0W+vPfiJPjaXuucIGuPLjvV2fIrvZg3aSHs2hjzu0SNFSkrDXp/eDEvcIlF3lj5BoK5kyn2iNkARbtBjiyrXIalyn+nQ8qGJYeUkcpLAHYNeXilGfEeHFoZHFLCzJBjQ8r7ao8nb4RpueKie937d0P3vEGbG5gCtzi6KgOjKq1ENQRBVmG0AMM/B79pFQixjUcucdWCXYYnEOCJHAU8lVVANcAzvjzpa7/vfv+07r733F/HP82r66wI4AVJH1CUPQVR1Ij0uXSuz2Pjvtezg/9ufsHY/emfq2rSlbBtpiZ93UJB0XMGUNeLZCM9YJho0wdsXSDMzeoKV2GhIAm+HV71NuT+Z+W41GbPkihoDXLEtkV4Z6mrohgBCjGiNyVG7G2KEVAUIk8UJy+MmDbFiBJmWxMjG2H65ZmrrSqMjTCNdkJhAFG8aDpcKf8BgBs/Aax6VYYaD9ruEC94wcS7Sh/vGvEaxmrjRSbF9Z+o295R42G7G4AOrNdCvcr74E5QLwQiLWpP0KK2+RPIaYN6JWu9UIr8/fr0+rRWy910NW04VFnuyNK042PZQB+yn6YjAJLkUFnuqgBAY5b7DhW0VhQqrYYfVwmLF2a5Q4kFv55+vbi8qZXxXorLjKxtu8zG7jDeakeqAQZC22KgeiJomrVax+qSP9SGdyPXoxx+/d59k9wNjG1zt7zruYGOUkJppq1pFlIh59DQNF1RvtaKzYP0KjaPahtPY8gBsir8NJ4ScUqRMRhhxVaJV16iKeSlFFlxkHU2qLtAU40hWZgRcAS0NoG8ahTe+x6O5Er0140lwxQKaXP4F/GEzGbwpOycJ7t1gG4jWODKZXNwoVdA3LIyIc+/F4fiqTvJxiCbPZ3008RPfILfv+mGBWonEFhSePmMmdNtDvwr6VImdCDG7Bevbe3wavl0/1T3e6FUpvi1hWHFAp9YM6Jr7rKdoH6+ZK8MjuIqxu60k+/ZuMPzOEUcu+zTFdO9XDGbLX9mEE6G/i35Y88d/5mmuwH62J3EiRvcpR9xMviQQ4EhjM6xz7CVF3YWa0XXAURlPhSaGbbNhYIC/Mh+avLtpcijaUrc6TgKKarXsPFMyZ2ymisbIb3Tyx+nb1XnEZND0Hq2QppaqE1puuUQ7+bBmEX4pYlgjBJoCl9SeR9HWvvBmFWrLrAmZ8aNHIKnWszmfFtjj651JKSwqw4ACGWWU9Y/NcVyO1T/9GI4TtF0bRWN7wjHyR75LmQg2rdSRB7UkFONB+twx1dJ5iJmLhRxq1E47s/itRFBQxwDTYWI4fCpnamS3KwFA0LHUuLGyVLQbhMBclEtefv15WJ1cz/fxUicsUPaop7OOcWTdITLTft5OHtu8vn58tCqWR5yQsmc6w37FkDNNMRe7FJXwZqaUkJN1+TvgsUpKz0DdCAQ9WbxciU6HInq51gOKdVJ9uke3SZpfk3joHmaz4LuLRM9cIApEhVCwjT1NehQv/q7bbk+XWX08jRhGTtlXWbrLkiTH4fnG9uUlQ2TFU3Zl7Rxb8SYRHrFepY6OrSpESEHvbohDagSWcu7TFKaSGYRa40zDuM0pZC7AumDbywWZtjSeS3AksNhCKjOa2ksHCYHSX66fvJGcQQ1A1TBkd0qimSv+keKG81PUxQeJjaOx5IuNAshZ378IV0/61GHWVrk8PhLNjKgmQfN9egwQ/wTiSPFZuzle9SLeY1C+iVf/ytPhdQhNhxR/kNLznlAoOjK0ljOw4DvVtf6VpdiQ7U6iFb74RubiR95S/XPi8svp5esxObi4rwt80sRc7KVvhwykYO8+swvUSPo2aE2TxpgRlM4kcN5p79Oj6+vaKZx78+BF7JWJ8Xk9SyKOixrH3x4Y0odOFDyU3VN0WmwqfZoahTKAcET1n2WqvQeju5Zc5G3mDSGNionjYEGsnNXSixnKHRec/jaoU2Zu1jEry6OqBppQM1UIUsxKwRMXTLwnWqB2rpiVooeoynnd/Jqke0o1FYS1AhoMgZUB3W0mqLOKfrFcvcWLFoIK3J3/e2jN0O2Inwlpal548ZJP56qXM6N0taaSw+rUjAaPLHM1VmIZdXlDXGmBhVVyq0mrqGcQnrDASoEbME2yqv6txWeQnIEsevGMQszsRMSaJfsvBT4bWELSruaEFBasqBNhO1Qzqxie5Ht6zpUuSBrt3JmSA4e006omEUGsh726bEn+7Q3tT/0B2TkzwwrZGtDdqjn04aYae/tBxwnssas0BG/KdPUURQsqxRgHUcSqMOBbzfYuv7p2BsweNXwbFbKtysMruh4efnp4vLT1c3CmNUabIL5lFtZ0dpt0ahFJqxo1NZwbIUaaXL89hIPMCEWj4tkbtyme40OTvC9n+67Uuf32PlUny9oH+O9MR6H0fxDeiu74E4oQAbhhIKNGUwNJ/kIr7Ep8tMi09f58J7lW06mlriD11Zk+VRdkxvL8iE5Qk2b93eylEJGonwXXaG8o5DpnT5V//3qPQDBX0OWnDxCdqv2v+xRs1pwbRiF405xn2aj+oIzraAt+sDzhspKcqBZyMHNaQXDggew4oHhunQQWX3m3vIjTyhIM+n4Oe3H/OCzGfsMYxfd7ulJp5T243sRp8GMdstfyNniVG+NJfMCqxzztsySMDvsqhWW1GUvrfrxB+kpBtri320cXPMpwVR27BXkPfMhGzu+Rn1eQ5qk0FaCJx2RjmU/YSnz5x4ZXfdysoA0W1T5AOntLeoo9OaFM1T68+QZp6bUvahjIivIlGxde8WFsQ359IyaZxwvUvciDwd3exucmVP3cr65T69HUAqNnV3yZFpyq4eZIFOw35SHHkJNqS0aUxfyWcYckaLF9mr38xlIaa21uqVPl7349z19neohN11xZvnF3y76MkL/u3f2TZSc9bTz67PKbQXq3tQntGwCRlb4WvuWPtFLzL5p+YY+8QmTxwsa3c6nyyGB9/18r4XegQOEbKUlOjw10TtwgCDOTfBER2VpbaZlNE/vWV622O8xZpaTWB4iOkjprgrml0/CfbY5asiCsXFICzp5yu2tdVcDpihSTVUhLlQU4oobS+vbzCD73pl5/I7UKkilm8sF7eUoYtftIlUR4xQiE/Rrg1vZXts+supgNMsUdw1BpKo8UTVsrSPM9Rg/XM1+3P9B5871zUVXsz7/HK84zXlZ0mm9E1GLSSXWIzE7o7fIm9rUn/rs9M64s14+qqmlpg0X0waB6frW3Q/X1MpYW8VewiTdIF/gnhcyCUj1s4cDnKyd2WtqvTprA8ngV1jvIia5E4s0skXyE2QLIdM9Dw/dWUABSxmTRrDYue8UzkNyt7crgDYPlplAded2X5DQhUAXy2eRrYoWGQqh69QQpVA6E1ASuqdprEhju4JpxEKLZ4MRlzwpUb5G9IgqUdcV+lBXoAY2hRpVA+a8TSztGsv2d7vMtUsVGasPkQ3TwnnUPBu/KOtbA5nt1FHUYd5oQjwQGaqwrLLjeR3HvCqxuUkOr+oR5qLE5uxXuuzhQci7cLB7aNI+4ifcLztKPk875YTUTNbuiXdeHM1uq9IWtGuzccRTdMbJIndpHXWsk0ppDHV4SWAAOaNB/ScebgJwlT+1Iqa+U4eyL8tjKFrxLlpi1M4ymaex+2eaLjtZYxFF7NQfNVfCTFGnqoZt7QezLwtFW2XSQqZxoCFdt6Fl2CbUqsUQnxGyWwWeghTmjbipamUViaEiQrNWaVMjGS5H6JSNUMVapDoKz5WQlJOMpW3p7uu0EoGJxF4BKknptGknynnGE7Vpty7h1lhgv0lHM9MC1Wi9Mdsta3Ky+4pomcKBVQ7+XOtsqCc1kV1RE21tR9SqVRf94SntshG5wbr8tAykjSgI8SgFqHZ51GzTnAWnv3TGaZ9vnBfJN3JGRXn03S6zDCKuTNUSb2g1xzNb3YBbB89Y7TONqmntC+AaRc/ao9PDb72rw/MvL4p3dKMq4zTGNrLm5qeU0c0QuXu3y1Clbl5ZIunaQeXdiPaBtT5syccopFG0hZNNQDD6GnqY3vF/</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/doc/response_protocol.md b/plugins/jobs/doc/response_protocol.md
deleted file mode 100644
index e195c407..00000000
--- a/plugins/jobs/doc/response_protocol.md
+++ /dev/null
@@ -1,54 +0,0 @@
-Response protocol used to communicate between worker and RR. When a worker completes its job, it should send a typed
-response. The response should contain:
-
-1. `type` field with the message type. Can be treated as enums.
-2. `data` field with the dynamic response related to the type.
-
-Types are:
-
-```
-0 - NO_ERROR
-1 - ERROR
-2 - ...
-```
-
-- `NO_ERROR`: contains only `type` and empty `data`.
-- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
- job,
- `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
- with string key and array of strings as a value.
-
-For example:
-
-`NO_ERROR`:
-For example:
-
-```json
-{
- "type": 0,
- "data": {}
-}
-
-```
-
-`ERROR`:
-
-```json
-{
- "type": 1,
- "data": {
- "message": "internal worker error",
- "requeue": true,
- "headers": [
- {
- "test": [
- "1",
- "2",
- "3"
- ]
- }
- ],
- "delay_seconds": 10
- }
-}
-```
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
deleted file mode 100644
index adab2a0a..00000000
--- a/plugins/jobs/job/job.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package job
-
-import (
- "time"
-)
-
-// constant keys to pack/unpack messages from different drivers
-const (
- RRID string = "rr_id"
- RRJob string = "rr_job"
- RRHeaders string = "rr_headers"
- RRPipeline string = "rr_pipeline"
- RRDelay string = "rr_delay"
- RRPriority string = "rr_priority"
-)
-
-// Job carries information about single job.
-type Job struct {
- // Job contains name of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-value pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
deleted file mode 100644
index 4a95e27d..00000000
--- a/plugins/jobs/job/job_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package job
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestOptions_DelayDuration(t *testing.T) {
- opts := &Options{Delay: 0}
- assert.Equal(t, time.Duration(0), opts.DelayDuration())
-}
-
-func TestOptions_DelayDuration2(t *testing.T) {
- opts := &Options{Delay: 1}
- assert.Equal(t, time.Second, opts.DelayDuration())
-}
diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go
deleted file mode 100644
index 38d0bcfb..00000000
--- a/plugins/jobs/metrics.go
+++ /dev/null
@@ -1,92 +0,0 @@
-package jobs
-
-import (
- "sync/atomic"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/plugins/informer"
-)
-
-func (p *Plugin) MetricsCollector() []prometheus.Collector {
- // p - implements Exporter interface (workers)
- // other - request duration and count
- return []prometheus.Collector{p.statsExporter}
-}
-
-const (
- namespace = "rr_jobs"
-)
-
-type statsExporter struct {
- workers informer.Informer
- workersMemory uint64
- jobsOk uint64
- pushOk uint64
- jobsErr uint64
- pushErr uint64
-}
-
-var (
- worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil)
- pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil)
- pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil)
- jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil)
- jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil)
-)
-
-func newStatsExporter(stats informer.Informer) *statsExporter {
- return &statsExporter{
- workers: stats,
- workersMemory: 0,
- jobsOk: 0,
- pushOk: 0,
- jobsErr: 0,
- pushErr: 0,
- }
-}
-
-func (se *statsExporter) metricsCallback(event interface{}) {
- if jev, ok := event.(events.JobEvent); ok {
- switch jev.Event { //nolint:exhaustive
- case events.EventJobOK:
- atomic.AddUint64(&se.jobsOk, 1)
- case events.EventPushOK:
- atomic.AddUint64(&se.pushOk, 1)
- case events.EventPushError:
- atomic.AddUint64(&se.pushErr, 1)
- case events.EventJobError:
- atomic.AddUint64(&se.jobsErr, 1)
- }
- }
-}
-
-func (se *statsExporter) Describe(d chan<- *prometheus.Desc) {
- // send description
- d <- worker
- d <- pushErr
- d <- pushOk
- d <- jobsErr
- d <- jobsOk
-}
-
-func (se *statsExporter) Collect(ch chan<- prometheus.Metric) {
- // get the copy of the processes
- workers := se.workers.Workers()
-
- // cumulative RSS memory in bytes
- var cum uint64
-
- // collect the memory
- for i := 0; i < len(workers); i++ {
- cum += workers[i].MemoryUsage
- }
-
- // send the values to the prometheus
- ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum))
- // send the values to the prometheus
- ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk)))
- ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr)))
- ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk)))
- ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr)))
-}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
deleted file mode 100644
index 8a8c1462..00000000
--- a/plugins/jobs/pipeline/pipeline.go
+++ /dev/null
@@ -1,98 +0,0 @@
-package pipeline
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// Pipeline defines pipeline options.
-type Pipeline map[string]interface{}
-
-const (
- priority string = "priority"
- driver string = "driver"
- name string = "name"
-)
-
-// With pipeline value
-func (p *Pipeline) With(name string, value interface{}) {
- (*p)[name] = value
-}
-
-// Name returns pipeline name.
-func (p Pipeline) Name() string {
- return p.String(name, "")
-}
-
-// Driver associated with the pipeline.
-func (p Pipeline) Driver() string {
- return p.String(driver, "")
-}
-
-// Has checks if value presented in pipeline.
-func (p Pipeline) Has(name string) bool {
- if _, ok := p[name]; ok {
- return true
- }
-
- return false
-}
-
-// String must return option value as string or return default value.
-func (p Pipeline) String(name string, d string) string {
- if value, ok := p[name]; ok {
- if str, ok := value.(string); ok {
- return str
- }
- }
-
- return d
-}
-
-// Int must return option value as string or return default value.
-func (p Pipeline) Int(name string, d int) int {
- if value, ok := p[name]; ok {
- if i, ok := value.(int); ok {
- return i
- }
- }
-
- return d
-}
-
-// Bool must return option value as bool or return default value.
-func (p Pipeline) Bool(name string, d bool) bool {
- if value, ok := p[name]; ok {
- if i, ok := value.(bool); ok {
- return i
- }
- }
-
- return d
-}
-
-// Map must return nested map value or empty config.
-// Here might be sqs attributes or tags for example
-func (p Pipeline) Map(name string, out map[string]string) error {
- if value, ok := p[name]; ok {
- if m, ok := value.(string); ok {
- err := json.Unmarshal(utils.AsBytes(m), &out)
- if err != nil {
- return err
- }
- }
- }
-
- return nil
-}
-
-// Priority returns default pipeline priority
-func (p Pipeline) Priority() int64 {
- if value, ok := p[priority]; ok {
- if v, ok := value.(int64); ok {
- return v
- }
- }
-
- return 10
-}
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
deleted file mode 100644
index 4482c70d..00000000
--- a/plugins/jobs/pipeline/pipeline_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package pipeline
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestPipeline_String(t *testing.T) {
- pipe := Pipeline{"value": "value"}
-
- assert.Equal(t, "value", pipe.String("value", ""))
- assert.Equal(t, "value", pipe.String("other", "value"))
-}
-
-func TestPipeline_Has(t *testing.T) {
- pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
-
- assert.Equal(t, true, pipe.Has("options"))
- assert.Equal(t, false, pipe.Has("other"))
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
deleted file mode 100644
index 3aec6acc..00000000
--- a/plugins/jobs/plugin.go
+++ /dev/null
@@ -1,719 +0,0 @@
-package jobs
-
-import (
- "context"
- "fmt"
- "sync"
- "time"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/pkg/state/process"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
-)
-
-const (
- // RrMode env variable
- RrMode string = "RR_MODE"
- RrModeJobs string = "jobs"
-
- PluginName string = "jobs"
- pipelines string = "pipelines"
-)
-
-type Plugin struct {
- sync.RWMutex
-
- // Jobs plugin configuration
- cfg *Config `structure:"jobs"`
- log logger.Logger
- workersPool pool.Pool
- server server.Server
-
- jobConstructors map[string]jobs.Constructor
- consumers sync.Map // map[string]jobs.Consumer
-
- // events handler
- events events.Handler
-
- // priority queue implementation
- queue priorityqueue.Queue
-
- // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
- pipelines sync.Map
-
- // initial set of the pipelines to consume
- consume map[string]struct{}
-
- // signal channel to stop the pollers
- stopCh chan struct{}
-
- // internal payloads pool
- pldPool sync.Pool
- statsExporter *statsExporter
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
- const op = errors.Op("jobs_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, err)
- }
-
- p.cfg.InitDefaults()
-
- p.server = server
-
- p.events = events.NewEventsHandler()
- p.events.AddListener(p.collectJobsEvents)
-
- p.jobConstructors = make(map[string]jobs.Constructor)
- p.consume = make(map[string]struct{})
- p.stopCh = make(chan struct{}, 1)
-
- p.pldPool = sync.Pool{New: func() interface{} {
- // with nil fields
- return &payload.Payload{}
- }}
-
- // initial set of pipelines
- for i := range p.cfg.Pipelines {
- p.pipelines.Store(i, p.cfg.Pipelines[i])
- }
-
- if len(p.cfg.Consume) > 0 {
- for i := 0; i < len(p.cfg.Consume); i++ {
- p.consume[p.cfg.Consume[i]] = struct{}{}
- }
- }
-
- // initialize priority queue
- p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
- p.log = log
-
- // metrics
- p.statsExporter = newStatsExporter(p)
- p.events.AddListener(p.statsExporter.metricsCallback)
-
- return nil
-}
-
-func (p *Plugin) Serve() chan error { //nolint:gocognit
- errCh := make(chan error, 1)
- const op = errors.Op("jobs_plugin_serve")
-
- // register initial pipelines
- p.pipelines.Range(func(key, value interface{}) bool {
- t := time.Now()
- // pipeline name (ie test-local, sqs-aws, etc)
- name := key.(string)
-
- // pipeline associated with the name
- pipe := value.(*pipeline.Pipeline)
- // driver for the pipeline (ie amqp, ephemeral, etc)
- dr := pipe.Driver()
-
- // jobConstructors contains constructors for the drivers
- // we need here to initialize these drivers for the pipelines
- if _, ok := p.jobConstructors[dr]; ok {
- // config key for the particular sub-driver jobs.pipelines.test-local
- configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
-
- // init the driver
- initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue)
- if err != nil {
- errCh <- errors.E(op, err)
- return false
- }
-
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers.Store(name, initializedDriver)
-
- // register pipeline for the initialized driver
- err = initializedDriver.Register(context.Background(), pipe)
- if err != nil {
- errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
- return false
- }
-
- // if pipeline initialized to be consumed, call Run on it
- if _, ok := p.consume[name]; ok {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
- err = initializedDriver.Run(ctx, pipe)
- if err != nil {
- errCh <- errors.E(op, err)
- return false
- }
- return true
- }
-
- return true
- }
-
- p.events.Push(events.JobEvent{
- Event: events.EventDriverReady,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Start: t,
- Elapsed: t.Sub(t),
- })
-
- return true
- })
-
- // do not continue processing, immediately stop if channel contains an error
- if len(errCh) > 0 {
- return errCh
- }
-
- var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
- if err != nil {
- errCh <- err
- return errCh
- }
-
- // start listening
- go func() {
- for i := uint8(0); i < p.cfg.NumPollers; i++ {
- go func() {
- for {
- select {
- case <-p.stopCh:
- p.log.Info("------> job poller stopped <------")
- return
- default:
- // get prioritized JOB from the queue
- jb := p.queue.ExtractMin()
-
- // parse the context
- // for each job, context contains:
- /*
- 1. Job class
- 2. Job ID provided from the outside
- 3. Job Headers map[string][]string
- 4. Timeout in seconds
- 5. Pipeline name
- */
-
- start := time.Now()
- p.events.Push(events.JobEvent{
- Event: events.EventJobStart,
- ID: jb.ID(),
- Start: start,
- Elapsed: 0,
- })
-
- ctx, err := jb.Context()
- if err != nil {
- p.events.Push(events.JobEvent{
- Event: events.EventJobError,
- Error: err,
- ID: jb.ID(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- errNack := jb.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
- }
- p.log.Error("job marshal context", "error", err)
- continue
- }
-
- // get payload from the sync.Pool
- exec := p.getPayload(jb.Body(), ctx)
-
- // protect from the pool reset
- p.RLock()
- resp, err := p.workersPool.Exec(exec)
- p.RUnlock()
- if err != nil {
- p.events.Push(events.JobEvent{
- Event: events.EventJobError,
- ID: jb.ID(),
- Error: err,
- Start: start,
- Elapsed: time.Since(start),
- })
- // RR protocol level error, Nack the job
- errNack := jb.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed", "error", errNack)
- }
-
- p.log.Error("job execute failed", "error", err)
-
- p.putPayload(exec)
- continue
- }
-
- // if response is nil or body is nil, just acknowledge the job
- if resp == nil || resp.Body == nil {
- p.putPayload(exec)
- err = jb.Ack()
- if err != nil {
- p.events.Push(events.JobEvent{
- Event: events.EventJobError,
- ID: jb.ID(),
- Error: err,
- Start: start,
- Elapsed: time.Since(start),
- })
- p.log.Error("acknowledge error, job might be missed", "error", err)
- continue
- }
-
- p.events.Push(events.JobEvent{
- Event: events.EventJobOK,
- ID: jb.ID(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- continue
- }
-
- // handle the response protocol
- err = handleResponse(resp.Body, jb, p.log)
- if err != nil {
- p.events.Push(events.JobEvent{
- Event: events.EventJobError,
- ID: jb.ID(),
- Start: start,
- Error: err,
- Elapsed: time.Since(start),
- })
- p.putPayload(exec)
- errNack := jb.Nack()
- if errNack != nil {
- p.log.Error("negatively acknowledge failed, job might be lost", "root error", err, "error nack", errNack)
- continue
- }
-
- p.log.Error("job negatively acknowledged", "error", err)
- continue
- }
-
- p.events.Push(events.JobEvent{
- Event: events.EventJobOK,
- ID: jb.ID(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- // return payload
- p.putPayload(exec)
- }
- }
- }()
- }
- }()
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- // range over all consumers and call stop
- p.consumers.Range(func(key, value interface{}) bool {
- consumer := value.(jobs.Consumer)
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := consumer.Stop(ctx)
- if err != nil {
- cancel()
- p.log.Error("stop job driver", "driver", key)
- return true
- }
- cancel()
- return true
- })
-
- // this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
- // but if not, this is not a problem at all.
- // The main target is to stop the drivers
- go func() {
- for i := uint8(0); i < p.cfg.NumPollers; i++ {
- // stop jobs plugin pollers
- p.stopCh <- struct{}{}
- }
- }()
-
- // just wait pollers for 5 seconds before exit
- time.Sleep(time.Second * 5)
-
- p.Lock()
- p.workersPool.Destroy(context.Background())
- p.Unlock()
-
- return nil
-}
-
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.CollectMQBrokers,
- }
-}
-
-func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
- p.jobConstructors[name.Name()] = c
-}
-
-func (p *Plugin) Workers() []*process.State {
- p.RLock()
- wrk := p.workersPool.Workers()
- p.RUnlock()
-
- ps := make([]*process.State, len(wrk))
-
- for i := 0; i < len(wrk); i++ {
- st, err := process.WorkerProcessState(wrk[i])
- if err != nil {
- p.log.Error("jobs workers state", "error", err)
- return nil
- }
-
- ps[i] = st
- }
-
- return ps
-}
-
-func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
- const op = errors.Op("jobs_plugin_drivers_state")
- jst := make([]*jobState.State, 0, 2)
- var err error
- p.consumers.Range(func(key, value interface{}) bool {
- consumer := value.(jobs.Consumer)
- newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
-
- var state *jobState.State
- state, err = consumer.State(newCtx)
- if err != nil {
- cancel()
- return false
- }
-
- jst = append(jst, state)
- cancel()
- return true
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
- return jst, nil
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Reset() error {
- p.Lock()
- defer p.Unlock()
-
- const op = errors.Op("jobs_plugin_reset")
- p.log.Info("JOBS plugin received restart request. Restarting...")
- p.workersPool.Destroy(context.Background())
- p.workersPool = nil
-
- var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback)
- if err != nil {
- return errors.E(op, err)
- }
-
- p.log.Info("JOBS workers pool successfully restarted")
-
- return nil
-}
-
-func (p *Plugin) Push(j *job.Job) error {
- const op = errors.Op("jobs_plugin_push")
-
- start := time.Now()
- // get the pipeline for the job
- pipe, ok := p.pipelines.Load(j.Options.Pipeline)
- if !ok {
- return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline))
- }
-
- // type conversion
- ppl := pipe.(*pipeline.Pipeline)
-
- d, ok := p.consumers.Load(ppl.Name())
- if !ok {
- return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
- }
-
- // if job has no priority, inherit it from the pipeline
- if j.Options.Priority == 0 {
- j.Options.Priority = ppl.Priority()
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
-
- err := d.(jobs.Consumer).Push(ctx, j)
- if err != nil {
- p.events.Push(events.JobEvent{
- Event: events.EventPushError,
- ID: j.Ident,
- Pipeline: ppl.Name(),
- Driver: ppl.Driver(),
- Error: err,
- Start: start,
- Elapsed: time.Since(start),
- })
- return errors.E(op, err)
- }
-
- p.events.Push(events.JobEvent{
- Event: events.EventPushOK,
- ID: j.Ident,
- Pipeline: ppl.Name(),
- Driver: ppl.Driver(),
- Error: err,
- Start: start,
- Elapsed: time.Since(start),
- })
-
- return nil
-}
-
-func (p *Plugin) PushBatch(j []*job.Job) error {
- const op = errors.Op("jobs_plugin_push")
- start := time.Now()
-
- for i := 0; i < len(j); i++ {
- // get the pipeline for the job
- pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
- if !ok {
- return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline))
- }
-
- ppl := pipe.(*pipeline.Pipeline)
-
- d, ok := p.consumers.Load(ppl.Name())
- if !ok {
- return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
- }
-
- // if job has no priority, inherit it from the pipeline
- if j[i].Options.Priority == 0 {
- j[i].Options.Priority = ppl.Priority()
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.(jobs.Consumer).Push(ctx, j[i])
- if err != nil {
- cancel()
- p.events.Push(events.JobEvent{
- Event: events.EventPushError,
- ID: j[i].Ident,
- Pipeline: ppl.Name(),
- Driver: ppl.Driver(),
- Start: start,
- Elapsed: time.Since(start),
- Error: err,
- })
- return errors.E(op, err)
- }
-
- cancel()
- }
-
- return nil
-}
-
-func (p *Plugin) Pause(pp string) {
- pipe, ok := p.pipelines.Load(pp)
-
- if !ok {
- p.log.Error("no such pipeline", "requested", pp)
- }
-
- ppl := pipe.(*pipeline.Pipeline)
-
- d, ok := p.consumers.Load(ppl.Name())
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pp)
- return
- }
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
- // redirect call to the underlying driver
- d.(jobs.Consumer).Pause(ctx, ppl.Name())
-}
-
-func (p *Plugin) Resume(pp string) {
- pipe, ok := p.pipelines.Load(pp)
- if !ok {
- p.log.Error("no such pipeline", "requested", pp)
- }
-
- ppl := pipe.(*pipeline.Pipeline)
-
- d, ok := p.consumers.Load(ppl.Name())
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pp)
- return
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
- // redirect call to the underlying driver
- d.(jobs.Consumer).Resume(ctx, ppl.Name())
-}
-
-// Declare a pipeline.
-func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
- const op = errors.Op("jobs_plugin_declare")
- // driver for the pipeline (ie amqp, ephemeral, etc)
- dr := pipeline.Driver()
- if dr == "" {
- return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name()))
- }
-
- // jobConstructors contains constructors for the drivers
- // we need here to initialize these drivers for the pipelines
- if _, ok := p.jobConstructors[dr]; ok {
- // init the driver from pipeline
- initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue)
- if err != nil {
- return errors.E(op, err)
- }
-
- // register pipeline for the initialized driver
- err = initializedDriver.Register(context.Background(), pipeline)
- if err != nil {
- return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
- }
-
- // if pipeline initialized to be consumed, call Run on it
- // but likely for the dynamic pipelines it should be started manually
- if _, ok := p.consume[pipeline.Name()]; ok {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
- err = initializedDriver.Run(ctx, pipeline)
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers.Store(pipeline.Name(), initializedDriver)
- // save the pipeline
- p.pipelines.Store(pipeline.Name(), pipeline)
- }
-
- return nil
-}
-
-// Destroy pipeline and release all associated resources.
-func (p *Plugin) Destroy(pp string) error {
- const op = errors.Op("jobs_plugin_destroy")
- pipe, ok := p.pipelines.Load(pp)
- if !ok {
- return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp))
- }
-
- // type conversion
- ppl := pipe.(*pipeline.Pipeline)
-
- // delete consumer
- d, ok := p.consumers.LoadAndDelete(ppl.Name())
- if !ok {
- return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
- }
-
- // delete old pipeline
- p.pipelines.LoadAndDelete(pp)
-
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.(jobs.Consumer).Stop(ctx)
- if err != nil {
- cancel()
- return errors.E(op, err)
- }
-
- cancel()
- return nil
-}
-
-func (p *Plugin) List() []string {
- out := make([]string, 0, 10)
-
- p.pipelines.Range(func(key, _ interface{}) bool {
- // we can safely convert value here as we know that we store keys as strings
- out = append(out, key.(string))
- return true
- })
-
- return out
-}
-
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- log: p.log,
- p: p,
- }
-}
-
-func (p *Plugin) collectJobsEvents(event interface{}) {
- if jev, ok := event.(events.JobEvent); ok {
- switch jev.Event {
- case events.EventPipePaused:
- p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventJobStart:
- p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventJobOK:
- p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventPushOK:
- p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventPushError:
- p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventJobError:
- p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventPipeActive:
- p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventPipeStopped:
- p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventPipeError:
- p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- case events.EventDriverReady:
- p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
- }
- }
-}
-
-func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
- pld := p.pldPool.Get().(*payload.Payload)
- pld.Body = body
- pld.Context = context
- return pld
-}
-
-func (p *Plugin) putPayload(pld *payload.Payload) {
- pld.Body = nil
- pld.Context = nil
- p.pldPool.Put(pld)
-}
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
deleted file mode 100644
index 9d769fdf..00000000
--- a/plugins/jobs/protocol.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package jobs
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- pq "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type Type uint32
-
-const (
- NoError Type = iota
- Error
-)
-
-// internal worker protocol (jobs mode)
-type protocol struct {
- // message type, see Type
- T Type `json:"type"`
- // Payload
- Data json.RawMessage `json:"data"`
-}
-
-type errorResp struct {
- Msg string `json:"message"`
- Requeue bool `json:"requeue"`
- Delay int64 `json:"delay_seconds"`
- Headers map[string][]string `json:"headers"`
-}
-
-func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
- const op = errors.Op("jobs_handle_response")
- // TODO(rustatian) to sync.Pool
- p := &protocol{}
-
- err := json.Unmarshal(resp, p)
- if err != nil {
- return errors.E(op, err)
- }
-
- switch p.T {
- // likely case
- case NoError:
- err = jb.Ack()
- if err != nil {
- return errors.E(op, err)
- }
- case Error:
- // TODO(rustatian) to sync.Pool
- er := &errorResp{}
-
- err = json.Unmarshal(p.Data, er)
- if err != nil {
- return errors.E(op, err)
- }
-
- log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
-
- if er.Requeue {
- err = jb.Requeue(er.Headers, er.Delay)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
-
- default:
- err = jb.Ack()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
deleted file mode 100644
index d7b93bd1..00000000
--- a/plugins/jobs/rpc.go
+++ /dev/null
@@ -1,160 +0,0 @@
-package jobs
-
-import (
- "context"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
-)
-
-type rpc struct {
- log logger.Logger
- p *Plugin
-}
-
-func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_push")
-
- // convert transport entity into domain
- // how we can do this quickly
-
- if j.GetJob().GetId() == "" {
- return errors.E(op, errors.Str("empty ID field not allowed"))
- }
-
- err := r.p.Push(from(j.GetJob()))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_push_batch")
-
- l := len(j.GetJobs())
-
- batch := make([]*job.Job, l)
-
- for i := 0; i < l; i++ {
- // convert transport entity into domain
- // how we can do this quickly
- batch[i] = from(j.GetJobs()[i])
- }
-
- err := r.p.PushBatch(batch)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
- for i := 0; i < len(req.GetPipelines()); i++ {
- r.p.Pause(req.GetPipelines()[i])
- }
-
- return nil
-}
-
-func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
- for i := 0; i < len(req.GetPipelines()); i++ {
- r.p.Resume(req.GetPipelines()[i])
- }
-
- return nil
-}
-
-func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
- resp.Pipelines = r.p.List()
- return nil
-}
-
-// Declare pipeline used to dynamically declare any type of the pipeline
-// Mandatory fields:
-// 1. Driver
-// 2. Pipeline name
-// 3. Options related to the particular pipeline
-func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_declare_pipeline")
- pipe := &pipeline.Pipeline{}
-
- for i := range req.GetPipeline() {
- (*pipe)[i] = req.GetPipeline()[i]
- }
-
- err := r.p.Declare(pipe)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rpc_declare_pipeline")
-
- var destroyed []string //nolint:prealloc
- for i := 0; i < len(req.GetPipelines()); i++ {
- err := r.p.Destroy(req.GetPipelines()[i])
- if err != nil {
- return errors.E(op, err)
- }
- destroyed = append(destroyed, req.GetPipelines()[i])
- }
-
- // return destroyed pipelines
- resp.Pipelines = destroyed
-
- return nil
-}
-
-func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
- const op = errors.Op("rpc_stats")
- state, err := r.p.JobsState(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(state); i++ {
- resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
- Pipeline: state[i].Pipeline,
- Driver: state[i].Driver,
- Queue: state[i].Queue,
- Active: state[i].Active,
- Delayed: state[i].Delayed,
- Reserved: state[i].Reserved,
- Ready: state[i].Ready,
- })
- }
-
- return nil
-}
-
-// from converts from transport entity to domain
-func from(j *jobsv1beta.Job) *job.Job {
- headers := make(map[string][]string, len(j.GetHeaders()))
-
- for k, v := range j.GetHeaders() {
- headers[k] = v.GetValue()
- }
-
- jb := &job.Job{
- Job: j.GetJob(),
- Headers: headers,
- Ident: j.GetId(),
- Payload: j.GetPayload(),
- Options: &job.Options{
- Priority: j.GetOptions().GetPriority(),
- Pipeline: j.GetOptions().GetPipeline(),
- Delay: j.GetOptions().GetDelay(),
- },
- }
-
- return jb
-}