diff options
author | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-25 18:03:30 +0300 |
commit | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch) | |
tree | 8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/beanstalk | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) |
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 26 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 2 |
3 files changed, 15 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 6323148b..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -19,7 +19,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +type consumer struct { log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -43,7 +43,7 @@ type JobConsumer struct { requeueCh chan *Item } -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return jc, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { +func (j *consumer) handleItem(ctx context.Context, item *Item) error { const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) @@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } // State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("beanstalk_state") stat, err := j.pool.Stats(ctx) if err != nil { @@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index f1d7ac76..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index f1385e70..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -4,7 +4,7 @@ import ( "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen() { +func (j *consumer) listen() { for { select { case <-j.stopCh: |