summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go50
-rw-r--r--plugins/jobs/brokers/ephemeral/config.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go26
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go2
-rw-r--r--plugins/jobs/brokers/ephemeral/queue.go7
-rw-r--r--plugins/jobs/config.go12
-rw-r--r--plugins/jobs/interface.go8
-rw-r--r--plugins/jobs/pipeline/pipeline.go10
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go16
-rw-r--r--plugins/jobs/plugin.go68
-rw-r--r--plugins/jobs/structs/job.go20
-rw-r--r--plugins/jobs/structs/job_options.go7
-rw-r--r--plugins/jobs/structs/job_test.go4
13 files changed, 119 insertions, 112 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 4d357c34..3eb20c27 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -3,60 +3,68 @@ package ephemeral
import (
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
- jobs chan *entry
- queues map[*pipeline.Pipeline]*queue
+ queues map[string]bool
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- jobs: make(chan *entry, 10),
- pq: q,
+ queues: make(map[string]bool),
+ pq: q,
}
- go jb.serve()
-
return jb, nil
}
-func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
- id := uuid.NewString()
+func (j *JobBroker) Push(job *structs.Job) (string, error) {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.queues[job.Options.Pipeline]; ok {
+ if !b {
+ return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ }
+ if job.Options.Priority == nil {
+ job.Options.Priority = intPtr(10)
+ }
+ job.Options.ID = uuid.NewString()
+
+ j.pq.Insert(job)
- j.jobs <- &entry{
- id: id,
+ return job.Options.ID, nil
}
- return id, nil
+ return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Stat() {
panic("implement me")
}
-func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; !ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ if _, ok := j.queues[pipeline]; ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = newQueue()
+ j.queues[pipeline] = true
return nil
}
-func (j *JobBroker) serve() {
- for item := range j.jobs {
- // item should satisfy
- j.pq.Insert(item)
+func intPtr(val uint64) *uint64 {
+ if val == 0 {
+ val = 10
}
+ return &val
}
diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go
deleted file mode 100644
index 847b63ea..00000000
--- a/plugins/jobs/brokers/ephemeral/config.go
+++ /dev/null
@@ -1 +0,0 @@
-package ephemeral
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
deleted file mode 100644
index 3cedec3e..00000000
--- a/plugins/jobs/brokers/ephemeral/entry.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package ephemeral
-
-type entry struct {
- id string
- priority uint64
-}
-
-func (e *entry) ID() string {
- return e.id
-}
-
-func (e *entry) Priority() uint64 {
- return e.priority
-}
-
-func (e *entry) Ask() {
- // no-op
-}
-
-func (e *entry) Nack() {
- // no-op
-}
-
-func (e *entry) Payload() []byte {
- panic("implement me")
-}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
index 84cc871b..146d1fdc 100644
--- a/plugins/jobs/brokers/ephemeral/plugin.go
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -1,7 +1,7 @@
package ephemeral
import (
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go
deleted file mode 100644
index 1c6d865b..00000000
--- a/plugins/jobs/brokers/ephemeral/queue.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package ephemeral
-
-type queue struct{}
-
-func newQueue() *queue {
- return &queue{}
-}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 87e36ecb..1cb2c2a2 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -9,19 +9,19 @@ import (
type Config struct {
// Workers configures roadrunner server and worker busy.
// Workers *roadrunner.ServerConfig
- poolCfg *poolImpl.Config
+ Pool *poolImpl.Config `mapstructure:"Pool"`
// Pipelines defines mapping between PHP job pipeline and associated job broker.
- Pipelines map[string]*pipeline.Pipeline
+ Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"`
// Consuming specifies names of pipelines to be consumed on service start.
- Consume []string
+ Consume []string `mapstructure:"consume"`
}
func (c *Config) InitDefaults() {
- if c.poolCfg == nil {
- c.poolCfg = &poolImpl.Config{}
+ if c.Pool == nil {
+ c.Pool = &poolImpl.Config{}
}
- c.poolCfg.InitDefaults()
+ c.Pool.InitDefaults()
}
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index a0aed50b..a2cf6ed9 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -1,17 +1,17 @@
package jobs
import (
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
// Consumer todo naming
type Consumer interface {
- Push(*pipeline.Pipeline, *structs.Job) (string, error)
+ Push(*structs.Job) (string, error)
Stat()
Consume(*pipeline.Pipeline)
- Register(*pipeline.Pipeline) error
+ Register(pipe string) error
}
type Broker interface {
@@ -20,7 +20,5 @@ type Broker interface {
type Item interface {
ID() string
- Ask()
- Nack()
Payload() []byte
}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index f27f6ede..987f6826 100644
--- a/plugins/jobs/pipeline/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -15,7 +15,7 @@ func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
out := make(Pipelines, 0)
for name, pipe := range pipes {
- if pipe.Broker() == "" {
+ if pipe.Driver() == "" {
return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
}
@@ -42,7 +42,7 @@ func (ps Pipelines) Broker(broker string) Pipelines {
out := make(Pipelines, 0)
for _, p := range ps {
- if p.Broker() != broker {
+ if p.Driver() != broker {
continue
}
@@ -98,9 +98,9 @@ func (p Pipeline) Name() string {
return p.String("name", "")
}
-// Broker associated with the pipeline.
-func (p Pipeline) Broker() string {
- return p.String("broker", "")
+// Driver associated with the pipeline.
+func (p Pipeline) Driver() string {
+ return p.String("driver", "")
}
// Has checks if value presented in pipeline.
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
index f03dcbb8..77acf96e 100644
--- a/plugins/jobs/pipeline/pipeline_test.go
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -58,23 +58,23 @@ func TestPipeline_Has(t *testing.T) {
func TestPipeline_FilterBroker(t *testing.T) {
pipes := Pipelines{
- &Pipeline{"name": "first", "broker": "a"},
- &Pipeline{"name": "second", "broker": "a"},
- &Pipeline{"name": "third", "broker": "b"},
- &Pipeline{"name": "forth", "broker": "b"},
+ &Pipeline{"name": "first", "driver": "a"},
+ &Pipeline{"name": "second", "driver": "a"},
+ &Pipeline{"name": "third", "driver": "b"},
+ &Pipeline{"name": "forth", "driver": "b"},
}
filtered := pipes.Names("first", "third")
assert.True(t, len(filtered) == 2)
- assert.Equal(t, "a", filtered[0].Broker())
- assert.Equal(t, "b", filtered[1].Broker())
+ assert.Equal(t, "a", filtered[0].Driver())
+ assert.Equal(t, "b", filtered[1].Driver())
filtered = pipes.Names("first", "third").Reverse()
assert.True(t, len(filtered) == 2)
- assert.Equal(t, "a", filtered[1].Broker())
- assert.Equal(t, "b", filtered[0].Broker())
+ assert.Equal(t, "a", filtered[1].Driver())
+ assert.Equal(t, "b", filtered[0].Driver())
filtered = pipes.Broker("a")
assert.True(t, len(filtered) == 2)
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6bf43a11..67077920 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,13 +2,14 @@ package jobs
import (
"context"
- "fmt"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -23,7 +24,7 @@ const (
)
type Plugin struct {
- cfg *Config
+ cfg *Config `mapstructure:"jobs"`
log logger.Logger
workersPool pool.Pool
@@ -41,10 +42,6 @@ type Plugin struct {
pipelines pipeline.Pipelines
}
-func testListener(data interface{}) {
- fmt.Println(data)
-}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
@@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.events.AddListener(testListener)
p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
@@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue2.NewPriorityQueue()
p.log = log
return nil
@@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ const op = errors.Op("jobs_plugin_serve")
for name := range p.brokers {
jb, err := p.brokers[name].InitJobBroker(p.queue)
@@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error {
p.consumers[name] = jb
}
+ // register initial pipelines
+ for i := 0; i < len(p.pipelines); i++ {
+ pipe := p.pipelines[i]
+
+ if jb, ok := p.consumers[pipe.Driver()]; ok {
+ err := jb.Register(pipe.Name())
+ if err != nil {
+ errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
+ return errCh
+ }
+ }
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
if err != nil {
errCh <- err
return errCh
}
- // initialize sub-plugins
- // provide a queue to them
- // start consume loop
- // start resp loop
+ // start listening
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
- /*
- go func() {
- for {
- // get data JOB from the queue
- job := p.queue.Pop()
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
- // request
- _ = job
- p.workersPool.Exec(nil)
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
}
- }()
+ }
+ }()
- */
return errCh
}
@@ -141,18 +155,16 @@ func (p *Plugin) Name() string {
func (p *Plugin) Push(j *structs.Job) (string, error) {
pipe := p.pipelines.Get(j.Options.Pipeline)
- broker, ok := p.consumers[pipe.Broker()]
+ broker, ok := p.consumers[pipe.Driver()]
if !ok {
panic("broker not found")
}
- id, err := broker.Push(pipe, j)
+ id, err := broker.Push(j)
if err != nil {
panic(err)
}
- // p.events.Push()
-
return id, nil
}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
index 2e394543..268444db 100644
--- a/plugins/jobs/structs/job.go
+++ b/plugins/jobs/structs/job.go
@@ -17,19 +17,35 @@ type Job struct {
Options *Options `json:"options,omitempty"`
}
+func (j *Job) ID() string {
+ return j.Options.ID
+}
+
+func (j *Job) Priority() uint64 {
+ return *j.Options.Priority
+}
+
// Body packs job payload into binary payload.
func (j *Job) Body() []byte {
return utils.AsBytes(j.Payload)
}
// Context packs job context (job, id) into binary payload.
-func (j *Job) Context(id string) []byte {
+func (j *Job) Context() []byte {
ctx, _ := json.Marshal(
struct {
ID string `json:"id"`
Job string `json:"job"`
- }{ID: id, Job: j.Job},
+ }{ID: j.Options.ID, Job: j.Job},
)
return ctx
}
+
+func (j *Job) Ack() {
+
+}
+
+func (j *Job) Nack() {
+
+}
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
index 1507d053..029a797d 100644
--- a/plugins/jobs/structs/job_options.go
+++ b/plugins/jobs/structs/job_options.go
@@ -4,6 +4,13 @@ import "time"
// Options carry information about how to handle given job.
type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority *uint64 `json:"priority"`
+
+ // ID - generated ID for the job
+ ID string `json:"id"`
+
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
index e7240c6b..92f78081 100644
--- a/plugins/jobs/structs/job_test.go
+++ b/plugins/jobs/structs/job_test.go
@@ -13,7 +13,7 @@ func TestJob_Body(t *testing.T) {
}
func TestJob_Context(t *testing.T) {
- j := &Job{Job: "job"}
+ j := &Job{Job: "job", Options: &Options{ID: "id"}}
- assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context())
}