summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-19 17:21:34 +0300
committerValery Piashchynski <[email protected]>2021-07-19 17:21:34 +0300
commit9750751b268b3fe706b911c0322d2e6ae27e652f (patch)
tree5b734653a26a41adda14d3162b4f627d8e0f75b4 /plugins/jobs
parent02fc3664f4ad97e03c8f3a641e7322362f78721c (diff)
Beanstalk initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go21
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go140
-rw-r--r--plugins/jobs/drivers/beanstalk/plugin.go22
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go3
4 files changed, 184 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index d034d65c..caa683ab 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -1 +1,22 @@
package beanstalk
+
+import "time"
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+ Timeout time.Duration `mapstructure:"timeout"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "tcp://localhost:11300"
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = time.Second * 30
+ }
+}
+
+type Config struct{}
+
+func (c *Config) InitDefault() {}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index d034d65c..e9bfafdd 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -1 +1,141 @@
package beanstalk
+
+import (
+ "strings"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+
+ // beanstalk
+ conn *beanstalk.Conn
+ tout time.Duration
+ // tube name
+ tName string
+}
+
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ tout: globalCfg.Timeout,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ if err != nil {
+ return nil, err
+ }
+
+ // start redial listener
+ go jc.redial()
+
+ return jc, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ const (
+ tube string = "tube"
+ )
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, ""),
+ }
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ if err != nil {
+ return nil, err
+ }
+
+ // start redial listener
+ go jc.redial()
+
+ return jc, nil
+}
+func (j *JobConsumer) Push(job *job.Job) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Run(pipeline *pipeline.Pipeline) error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Stop() error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go
index 2fea1c31..529d1474 100644
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ b/plugins/jobs/drivers/beanstalk/plugin.go
@@ -9,6 +9,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+const (
+ pluginName string = "beanstalk"
+)
+
type Plugin struct {
log logger.Logger
cfg config.Configurer
@@ -20,10 +24,24 @@ func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
return nil
}
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return nil, nil
+ return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return nil, nil
+ return FromPipeline(pipe, p.log, p.cfg, eh, pq)
}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
new file mode 100644
index 00000000..c9e72ad8
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -0,0 +1,3 @@
+package beanstalk
+
+func (j *JobConsumer) redial() {}