diff options
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 21 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 213 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/encode_test.go | 77 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 123 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 35 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 30 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-test.yaml | 1 |
7 files changed, 469 insertions, 31 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index caa683ab..f05ee122 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -17,6 +17,23 @@ func (c *GlobalCfg) InitDefault() { } } -type Config struct{} +type Config struct { + PipePriority int64 `mapstructure:"priority"` + TubePriority uint32 `mapstructure:"tube_priority"` + Tube string `mapstructure:"tube"` + ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` +} + +func (c *Config) InitDefault() { + if c.Tube == "" { + c.Tube = "default" + } -func (c *Config) InitDefault() {} + if c.ReserveTimeout == 0 { + c.ReserveTimeout = time.Second * 5 + } + + if c.PipePriority == 0 { + c.PipePriority = 10 + } +} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index e9bfafdd..27d453f4 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -1,7 +1,10 @@ package beanstalk import ( + "bytes" "strings" + "sync" + "sync/atomic" "time" "github.com/beanstalkd/go-beanstalk" @@ -15,15 +18,29 @@ import ( ) type JobConsumer struct { + sync.Mutex log logger.Logger eh events.Handler pq priorityqueue.Queue + pipeline atomic.Value + listeners uint32 + // beanstalk - conn *beanstalk.Conn - tout time.Duration + addr string + network string + conn *beanstalk.Conn + tube *beanstalk.Tube + tubeSet *beanstalk.TubeSet + reserveTimeout time.Duration + reconnectCh chan struct{} + tout time.Duration // tube name - tName string + tName string + tubePriority uint32 + priority int64 + + stopCh chan struct{} } func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,14 +64,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config globalCfg.InitDefault() - // initialize job consumer - jc := &JobConsumer{ - pq: pq, - log: log, - eh: e, - tout: globalCfg.Timeout, - } - // PARSE CONFIGURATION ------- dsn := strings.Split(globalCfg.Addr, "://") @@ -62,11 +71,32 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + reserveTimeout: pipeCfg.ReserveTimeout, + tName: pipeCfg.Tube, + tubePriority: pipeCfg.TubePriority, + priority: pipeCfg.PipePriority, + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + reconnectCh: make(chan struct{}), + } + + jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout) if err != nil { return nil, err } + jc.tube = beanstalk.NewTube(jc.conn, jc.tName) + jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName) + // start redial listener go jc.redial() @@ -77,7 +107,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu const op = errors.Op("new_beanstalk_consumer") const ( - tube string = "tube" + tube string = "tube" + reserveTimeout string = "reserve_timeout" ) // PARSE CONFIGURATION ------- @@ -92,11 +123,14 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // initialize job consumer jc := &JobConsumer{ - pq: pq, - log: log, - eh: e, - tout: globalCfg.Timeout, - tName: pipe.String(tube, ""), + pq: pq, + log: log, + eh: e, + tout: globalCfg.Timeout, + tName: pipe.String(tube, ""), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + stopCh: make(chan struct{}), + reconnectCh: make(chan struct{}), } // PARSE CONFIGURATION ------- @@ -116,26 +150,149 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(job *job.Job) error { - panic("implement me") +func (j *JobConsumer) Push(jb *job.Job) error { + const op = errors.Op("beanstalk_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != jb.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) + } + + // reconnect protection + j.Lock() + defer j.Unlock() + + item := fromJob(jb) + + bb := new(bytes.Buffer) + bb.Grow(64) + err := item.pack(bb) + if err != nil { + return errors.E(op, err) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 + // <pri> is an integer < 2**32. Jobs with smaller priority values will be + // scheduled before jobs with larger priorities. The most urgent priority is 0; + // the least urgent priority is 4,294,967,295. + // + // <delay> is an integer number of seconds to wait before putting the job in + // the ready queue. The job will be in the "delayed" state during this time. + // Maximum delay is 2**32-1. + // + // <ttr> -- time to run -- is an integer number of seconds to allow a worker + // to run this job. This time is counted from the moment a worker reserves + // this job. If the worker does not delete, release, or bury the job within + // <ttr> seconds, the job will time out and the server will release the job. + // The minimum ttr is 1. If the client sends 0, the server will silently + // increase the ttr to 1. Maximum ttr is 2**32-1. + id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + if err != nil { + errD := j.conn.Delete(id) + if errD != nil { + return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) + } + return errors.E(op, err) + } + + return nil } func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { - panic("implement me") + // register the pipeline + j.pipeline.Store(pipeline) + return nil } -func (j *JobConsumer) Run(pipeline *pipeline.Pipeline) error { - panic("implement me") +func (j *JobConsumer) Run(p *pipeline.Pipeline) error { + const op = errors.Op("beanstalk_run") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) + } + + atomic.AddUint32(&j.listeners, 1) + + j.Lock() + defer j.Unlock() + + go j.listen() + + return nil } func (j *JobConsumer) Stop() error { - panic("implement me") + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil } -func (j *JobConsumer) Pause(pipeline string) { - panic("implement me") +func (j *JobConsumer) Pause(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } -func (j *JobConsumer) Resume(pipeline string) { - panic("implement me") +func (j *JobConsumer) Resume(p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen() + + // increase num of listeners + atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go new file mode 100644 index 00000000..34f2342b --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/encode_test.go @@ -0,0 +1,77 @@ +package beanstalk + +import ( + "bytes" + "crypto/rand" + "encoding/gob" + "testing" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +func BenchmarkEncodeGob(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + Timeout: 5, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb := new(bytes.Buffer) + err := gob.NewEncoder(bb).Encode(item) + if err != nil { + b.Fatal(err) + } + _ = bb.Bytes() + bb.Reset() + } +} + +func BenchmarkEncodeJsonIter(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + Timeout: 5, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb, err := json.Marshal(item) + if err != nil { + b.Fatal(err) + } + _ = bb + } +} diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go new file mode 100644 index 00000000..329d4c8d --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -0,0 +1,123 @@ +package beanstalk + +import ( + "bytes" + "encoding/gob" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int64 `json:"timeout,omitempty"` + + // Private ================ + id uint64 + conn *beanstalk.Conn +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the sqs, MessageAttributes used instead +func (i *Item) Context() ([]byte, error) { + return nil, nil +} + +func (i *Item) Ack() error { + return i.Options.conn.Delete(i.Options.id) +} + +func (i *Item) Nack() error { + return nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} + +func (i *Item) pack(b *bytes.Buffer) error { + err := gob.NewEncoder(b).Encode(i) + if err != nil { + return err + } + + return nil +} + +func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error { + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) + if err != nil { + return err + } + out.Options.conn = conn + out.Options.id = id + + return nil +} diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go new file mode 100644 index 00000000..873930d5 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -0,0 +1,35 @@ +package beanstalk + +func (j *JobConsumer) listen() { + for { + select { + case <-j.stopCh: + j.log.Warn("beanstalk listener stopped") + return + + default: + // lock used here to prevent consume from the broken connection + j.Lock() + + id, body, err := j.tubeSet.Reserve(j.reserveTimeout) + if err != nil { + j.log.Error("beanstalk reserve", "error", err) + j.Unlock() + continue + } + + item := &Item{} + err = unpack(id, body, j.conn, item) + if err != nil { + j.log.Error("beanstalk unpack item", "error", err) + j.Unlock() + continue + } + + // insert job into the priority queue + j.pq.Insert(item) + + j.Unlock() + } + } +} diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go index c9e72ad8..e1922517 100644 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -1,3 +1,31 @@ package beanstalk -func (j *JobConsumer) redial() {} +import ( + "sync/atomic" + + "github.com/beanstalkd/go-beanstalk" +) + +func (j *JobConsumer) redial() { + for range j.reconnectCh { + // backoff here + + j.Lock() + + var err error + j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout) + if err != nil { + panic(err) + } + + j.tube = beanstalk.NewTube(j.conn, j.tName) + j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName) + + // restart listener + if atomic.LoadUint32(&j.listeners) == 1 { + go j.listen() + } + + j.Unlock() + } +} diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml index 6e2733dd..e3c0b017 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml @@ -90,6 +90,7 @@ jobs: priority: 11 tube: default pipeline_size: 1000000 + reserve_timeout: 10s test-3: driver: sqs |