summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go21
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go140
-rw-r--r--plugins/jobs/drivers/beanstalk/plugin.go22
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go3
6 files changed, 187 insertions, 2 deletions
diff --git a/go.mod b/go.mod
index 1cedb379..4c0c1c92 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.4.1
github.com/aws/aws-sdk-go-v2/credentials v1.3.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0
+ github.com/beanstalkd/go-beanstalk v0.1.0 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
diff --git a/go.sum b/go.sum
index fbaf6411..0eca78cc 100644
--- a/go.sum
+++ b/go.sum
@@ -63,6 +63,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43
github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg=
github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM=
github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
+github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w=
+github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index d034d65c..caa683ab 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -1 +1,22 @@
package beanstalk
+
+import "time"
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+ Timeout time.Duration `mapstructure:"timeout"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "tcp://localhost:11300"
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = time.Second * 30
+ }
+}
+
+type Config struct{}
+
+func (c *Config) InitDefault() {}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index d034d65c..e9bfafdd 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -1 +1,141 @@
package beanstalk
+
+import (
+ "strings"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+
+ // beanstalk
+ conn *beanstalk.Conn
+ tout time.Duration
+ // tube name
+ tName string
+}
+
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ tout: globalCfg.Timeout,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ if err != nil {
+ return nil, err
+ }
+
+ // start redial listener
+ go jc.redial()
+
+ return jc, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ const (
+ tube string = "tube"
+ )
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, ""),
+ }
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ if err != nil {
+ return nil, err
+ }
+
+ // start redial listener
+ go jc.redial()
+
+ return jc, nil
+}
+func (j *JobConsumer) Push(job *job.Job) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Run(pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Stop() error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go
index 2fea1c31..529d1474 100644
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ b/plugins/jobs/drivers/beanstalk/plugin.go
@@ -9,6 +9,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+const (
+ pluginName string = "beanstalk"
+)
+
type Plugin struct {
log logger.Logger
cfg config.Configurer
@@ -20,10 +24,24 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
return nil
}
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return nil, nil
+ return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return nil, nil
+ return FromPipeline(pipe, p.log, p.cfg, eh, pq)
}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
new file mode 100644
index 00000000..c9e72ad8
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -0,0 +1,3 @@
+package beanstalk
+
+func (j *JobConsumer) redial() {}