summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/beanstalk
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go26
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
3 files changed, 15 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 6323148b..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -19,7 +19,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+type consumer struct {
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -43,7 +43,7 @@ type JobConsumer struct {
requeueCh chan *Item
}
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return jc, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered
@@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+func (j *consumer) handleItem(ctx context.Context, item *Item) error {
const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
@@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("beanstalk_state")
stat, err := j.pool.Stats(ctx)
if err != nil {
@@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index f1d7ac76..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
+func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index f1385e70..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -4,7 +4,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen() {
+func (j *consumer) listen() {
for {
select {
case <-j.stopCh: