summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/beanstalk/tube.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk/tube.go')
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube.go250
1 files changed, 0 insertions, 250 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go
deleted file mode 100644
index 9d7ad117..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube.go
+++ /dev/null
@@ -1,250 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type tube struct {
- active int32
- pipe *jobs.Pipeline
- mut sync.Mutex
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
- reserve time.Duration
-
- // tube events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id uint64
- data []byte
-}
-
-func (e *entry) String() string {
- return fmt.Sprintf("%v", e.id)
-}
-
-// create new tube consumer and producer
-func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
- if pipe.String("tube", "") == "" {
- return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
- }
-
- return &tube{
- pipe: pipe,
- tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
- tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
- reserve: pipe.Duration("reserve", time.Second),
- lsn: lsn,
- }, nil
-}
-
-// run consumers
-func (t *tube) serve(connector connFactory) {
- // tube specific consume connection
- cn, err := connector.newConn()
- if err != nil {
- t.report(err)
- return
- }
- defer cn.Close()
-
- t.wait = make(chan interface{})
- atomic.StoreInt32(&t.active, 1)
-
- for {
- e, err := t.consume(cn)
- if err != nil {
- if isConnError(err) {
- t.report(err)
- }
- continue
- }
-
- if e == nil {
- return
- }
-
- h := <-t.execPool
- go func(h jobs.Handler, e *entry) {
- err := t.do(cn, h, e)
- t.execPool <- h
- t.wg.Done()
- t.report(err)
- }(h, e)
- }
-}
-
-// fetch consume
-func (t *tube) consume(cn *conn) (*entry, error) {
- t.muw.Lock()
- defer t.muw.Unlock()
-
- select {
- case <-t.wait:
- return nil, nil
- default:
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.tubeSet.Conn = conn
-
- id, data, err := t.tubeSet.Reserve(t.reserve)
- cn.release(err)
-
- if err != nil {
- return nil, err
- }
-
- t.wg.Add(1)
- return &entry{id: id, data: data}, nil
- }
-}
-
-// do data
-func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
- j, err := unpack(e.data)
- if err != nil {
- return err
- }
-
- err = h(e.String(), j)
-
- // mandatory acquisition
- conn, connErr := cn.acquire(true)
- if connErr != nil {
- // possible if server is dead
- return connErr
- }
-
- if err == nil {
- return cn.release(conn.Delete(e.id))
- }
-
- stat, statErr := conn.StatsJob(e.id)
- if statErr != nil {
- return cn.release(statErr)
- }
-
- t.errHandler(e.String(), j, err)
-
- reserves, ok := strconv.Atoi(stat["reserves"])
- if ok != nil || !j.Options.CanRetry(reserves-1) {
- return cn.release(conn.Bury(e.id, 0))
- }
-
- return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
-}
-
-// stop tube consuming
-func (t *tube) stop() {
- if atomic.LoadInt32(&t.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&t.active, 0)
-
- close(t.wait)
-
- t.muw.Lock()
- t.wg.Wait()
- t.muw.Unlock()
-}
-
-// put data into pool or return error (no wait), this method will try to reattempt operation if
-// dead conn found.
-func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- id, err = t.doPut(cn, attempt, data, delay, rrt)
- if err != nil && isConnError(err) {
- return t.doPut(cn, attempt, data, delay, rrt)
- }
-
- return id, err
-}
-
-// perform put operation
-func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return "", err
- }
-
- var bid uint64
-
- t.mut.Lock()
- t.tube.Conn = conn
- bid, err = t.tube.Put(data, 0, delay, rrt)
- t.mut.Unlock()
-
- return strconv.FormatUint(bid, 10), cn.release(err)
-}
-
-// return tube stats (retries)
-func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
- stat, err = t.doStat(cn)
- if err != nil && isConnError(err) {
- return t.doStat(cn)
- }
-
- return stat, err
-}
-
-// return tube stats
-func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.mut.Lock()
- t.tube.Conn = conn
- values, err := t.tube.Stats()
- t.mut.Unlock()
-
- if err != nil {
- return nil, cn.release(err)
- }
-
- stat = &jobs.Stat{InternalName: t.tube.Name}
-
- if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
- stat.Queue = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
- stat.Active = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
- stat.Delayed = int64(v)
- }
-
- return stat, cn.release(nil)
-}
-
-// report tube specific error
-func (t *tube) report(err error) {
- if err != nil {
- t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
- }
-}