diff options
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 21 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 140 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/plugin.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 3 |
6 files changed, 187 insertions, 2 deletions
@@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.4.1 github.com/aws/aws-sdk-go-v2/credentials v1.3.0 github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0 + github.com/beanstalkd/go-beanstalk v0.1.0 // indirect github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cenkalti/backoff/v4 v4.1.1 github.com/fasthttp/websocket v1.4.3 @@ -63,6 +63,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43 github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg= github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM= github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w= +github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index d034d65c..caa683ab 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -1 +1,22 @@ package beanstalk + +import "time" + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` + Timeout time.Duration `mapstructure:"timeout"` +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "tcp://localhost:11300" + } + + if c.Timeout == 0 { + c.Timeout = time.Second * 30 + } +} + +type Config struct{} + +func (c *Config) InitDefault() {} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index d034d65c..e9bfafdd 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -1 +1,141 @@ package beanstalk + +import ( + "strings" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type JobConsumer struct { + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + + // beanstalk + conn *beanstalk.Conn + tout time.Duration + // tube name + tName string +} + +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + tout: globalCfg.Timeout, + } + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + if err != nil { + return nil, err + } + + // start redial listener + go jc.redial() + + return jc, nil +} + +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + const ( + tube string = "tube" + ) + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + tout: globalCfg.Timeout, + tName: pipe.String(tube, ""), + } + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + if err != nil { + return nil, err + } + + // start redial listener + go jc.redial() + + return jc, nil +} +func (j *JobConsumer) Push(job *job.Job) error { + panic("implement me") +} + +func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { + panic("implement me") +} + +func (j *JobConsumer) Run(pipeline *pipeline.Pipeline) error { + panic("implement me") +} + +func (j *JobConsumer) Stop() error { + panic("implement me") +} + +func (j *JobConsumer) Pause(pipeline string) { + panic("implement me") +} + +func (j *JobConsumer) Resume(pipeline string) { + panic("implement me") +} diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go index 2fea1c31..529d1474 100644 --- a/plugins/jobs/drivers/beanstalk/plugin.go +++ b/plugins/jobs/drivers/beanstalk/plugin.go @@ -9,6 +9,10 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) +const ( + pluginName string = "beanstalk" +) + type Plugin struct { log logger.Logger cfg config.Configurer @@ -20,10 +24,24 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { return nil } +func (p *Plugin) Serve() chan error { + return make(chan error) +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return nil, nil + return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return nil, nil + return FromPipeline(pipe, p.log, p.cfg, eh, pq) } diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go new file mode 100644 index 00000000..c9e72ad8 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -0,0 +1,3 @@ +package beanstalk + +func (j *JobConsumer) redial() {} |