summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go21
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go213
-rw-r--r--plugins/jobs/drivers/beanstalk/encode_test.go77
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go123
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go35
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go30
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml1
7 files changed, 469 insertions, 31 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index caa683ab..f05ee122 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -17,6 +17,23 @@ func (c *GlobalCfg) InitDefault() {
}
}
-type Config struct{}
+type Config struct {
+ PipePriority int64 `mapstructure:"priority"`
+ TubePriority uint32 `mapstructure:"tube_priority"`
+ Tube string `mapstructure:"tube"`
+ ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
+}
+
+func (c *Config) InitDefault() {
+ if c.Tube == "" {
+ c.Tube = "default"
+ }
-func (c *Config) InitDefault() {}
+ if c.ReserveTimeout == 0 {
+ c.ReserveTimeout = time.Second * 5
+ }
+
+ if c.PipePriority == 0 {
+ c.PipePriority = 10
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index e9bfafdd..27d453f4 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -1,7 +1,10 @@
package beanstalk
import (
+ "bytes"
"strings"
+ "sync"
+ "sync/atomic"
"time"
"github.com/beanstalkd/go-beanstalk"
@@ -15,15 +18,29 @@ import (
)
type JobConsumer struct {
+ sync.Mutex
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
+ pipeline atomic.Value
+ listeners uint32
+
// beanstalk
- conn *beanstalk.Conn
- tout time.Duration
+ addr string
+ network string
+ conn *beanstalk.Conn
+ tube *beanstalk.Tube
+ tubeSet *beanstalk.TubeSet
+ reserveTimeout time.Duration
+ reconnectCh chan struct{}
+ tout time.Duration
// tube name
- tName string
+ tName string
+ tubePriority uint32
+ priority int64
+
+ stopCh chan struct{}
}
func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -47,14 +64,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
globalCfg.InitDefault()
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- tout: globalCfg.Timeout,
- }
-
// PARSE CONFIGURATION -------
dsn := strings.Split(globalCfg.Addr, "://")
@@ -62,11 +71,32 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ reserveTimeout: pipeCfg.ReserveTimeout,
+ tName: pipeCfg.Tube,
+ tubePriority: pipeCfg.TubePriority,
+ priority: pipeCfg.PipePriority,
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ reconnectCh: make(chan struct{}),
+ }
+
+ jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout)
if err != nil {
return nil, err
}
+ jc.tube = beanstalk.NewTube(jc.conn, jc.tName)
+ jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName)
+
// start redial listener
go jc.redial()
@@ -77,7 +107,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
const op = errors.Op("new_beanstalk_consumer")
const (
- tube string = "tube"
+ tube string = "tube"
+ reserveTimeout string = "reserve_timeout"
)
// PARSE CONFIGURATION -------
@@ -92,11 +123,14 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// initialize job consumer
jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, ""),
+ pq: pq,
+ log: log,
+ eh: e,
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, ""),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ stopCh: make(chan struct{}),
+ reconnectCh: make(chan struct{}),
}
// PARSE CONFIGURATION -------
@@ -116,26 +150,149 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(job *job.Job) error {
- panic("implement me")
+func (j *JobConsumer) Push(jb *job.Job) error {
+ const op = errors.Op("beanstalk_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // reconnect protection
+ j.Lock()
+ defer j.Unlock()
+
+ item := fromJob(jb)
+
+ bb := new(bytes.Buffer)
+ bb.Grow(64)
+ err := item.pack(bb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
+ // <pri> is an integer < 2**32. Jobs with smaller priority values will be
+ // scheduled before jobs with larger priorities. The most urgent priority is 0;
+ // the least urgent priority is 4,294,967,295.
+ //
+ // <delay> is an integer number of seconds to wait before putting the job in
+ // the ready queue. The job will be in the "delayed" state during this time.
+ // Maximum delay is 2**32-1.
+ //
+ // <ttr> -- time to run -- is an integer number of seconds to allow a worker
+ // to run this job. This time is counted from the moment a worker reserves
+ // this job. If the worker does not delete, release, or bury the job within
+ // <ttr> seconds, the job will time out and the server will release the job.
+ // The minimum ttr is 1. If the client sends 0, the server will silently
+ // increase the ttr to 1. Maximum ttr is 2**32-1.
+ id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ if err != nil {
+ errD := j.conn.Delete(id)
+ if errD != nil {
+ return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
+ }
+ return errors.E(op, err)
+ }
+
+ return nil
}
func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
- panic("implement me")
+ // register the pipeline
+ j.pipeline.Store(pipeline)
+ return nil
}
-func (j *JobConsumer) Run(pipeline *pipeline.Pipeline) error {
- panic("implement me")
+func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("beanstalk_run")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
+ }
+
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.Lock()
+ defer j.Unlock()
+
+ go j.listen()
+
+ return nil
}
func (j *JobConsumer) Stop() error {
- panic("implement me")
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ j.stopCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
}
-func (j *JobConsumer) Pause(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Pause(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ j.stopCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
-func (j *JobConsumer) Resume(pipeline string) {
- panic("implement me")
+func (j *JobConsumer) Resume(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+
+ // increase num of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go
new file mode 100644
index 00000000..34f2342b
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/encode_test.go
@@ -0,0 +1,77 @@
+package beanstalk
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/gob"
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+func BenchmarkEncodeGob(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ Timeout: 5,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb := new(bytes.Buffer)
+ err := gob.NewEncoder(bb).Encode(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb.Bytes()
+ bb.Reset()
+ }
+}
+
+func BenchmarkEncodeJsonIter(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ Timeout: 5,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb, err := json.Marshal(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
new file mode 100644
index 00000000..329d4c8d
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -0,0 +1,123 @@
+package beanstalk
+
+import (
+ "bytes"
+ "encoding/gob"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+
+ // Private ================
+ id uint64
+ conn *beanstalk.Conn
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (i *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (i *Item) Ack() error {
+ return i.Options.conn.Delete(i.Options.id)
+}
+
+func (i *Item) Nack() error {
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}
+
+func (i *Item) pack(b *bytes.Buffer) error {
+ err := gob.NewEncoder(b).Encode(i)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error {
+ err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
+ if err != nil {
+ return err
+ }
+ out.Options.conn = conn
+ out.Options.id = id
+
+ return nil
+}
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
new file mode 100644
index 00000000..873930d5
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -0,0 +1,35 @@
+package beanstalk
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.stopCh:
+ j.log.Warn("beanstalk listener stopped")
+ return
+
+ default:
+ // lock used here to prevent consume from the broken connection
+ j.Lock()
+
+ id, body, err := j.tubeSet.Reserve(j.reserveTimeout)
+ if err != nil {
+ j.log.Error("beanstalk reserve", "error", err)
+ j.Unlock()
+ continue
+ }
+
+ item := &Item{}
+ err = unpack(id, body, j.conn, item)
+ if err != nil {
+ j.log.Error("beanstalk unpack item", "error", err)
+ j.Unlock()
+ continue
+ }
+
+ // insert job into the priority queue
+ j.pq.Insert(item)
+
+ j.Unlock()
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
index c9e72ad8..e1922517 100644
--- a/plugins/jobs/drivers/beanstalk/redial.go
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -1,3 +1,31 @@
package beanstalk
-func (j *JobConsumer) redial() {}
+import (
+ "sync/atomic"
+
+ "github.com/beanstalkd/go-beanstalk"
+)
+
+func (j *JobConsumer) redial() {
+ for range j.reconnectCh {
+ // backoff here
+
+ j.Lock()
+
+ var err error
+ j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout)
+ if err != nil {
+ panic(err)
+ }
+
+ j.tube = beanstalk.NewTube(j.conn, j.tName)
+ j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName)
+
+ // restart listener
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ go j.listen()
+ }
+
+ j.Unlock()
+ }
+}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
index 6e2733dd..e3c0b017 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -90,6 +90,7 @@ jobs:
priority: 11
tube: default
pipeline_size: 1000000
+ reserve_timeout: 10s
test-3:
driver: sqs