diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/item.go')
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 |
1 files changed, 0 insertions, 229 deletions
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go deleted file mode 100644 index 837f8c63..00000000 --- a/plugins/boltdb/boltjobs/item.go +++ /dev/null @@ -1,229 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" - "go.etcd.io/bbolt" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - db *bbolt.DB - active *uint64 - delayed *uint64 -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - const op = errors.Op("boltdb_item_ack") - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - if i.Options.Delay > 0 { - atomic.AddUint64(i.Options.delayed, ^uint64(0)) - } else { - atomic.AddUint64(i.Options.active, ^uint64(0)) - } - - return tx.Commit() -} - -func (i *Item) Nack() error { - const op = errors.Op("boltdb_item_ack") - /* - steps: - 1. begin tx - 2. get item by ID from the InQueueBucket (previously put in the listener) - 3. put it back to the PushBucket - 4. Delete it from the InQueueBucket - */ - tx, err := i.Options.db.Begin(true) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - v := inQb.Get(utils.AsBytes(i.ID())) - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - err = pushB.Put(utils.AsBytes(i.ID()), v) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - _ = tx.Rollback() - return errors.E(op, err) - } - - return tx.Commit() -} - -/* -Requeue algorithm: -1. Rewrite item headers and delay. -2. Begin writable transaction on attached to the item db. -3. Delete item from the InQueueBucket -4. Handle items with the delay: - 4.1. Get DelayBucket - 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format - 4.3. Put this key with value to the DelayBucket -5. W/o delay, put the key with value to the PushBucket (requeue) -*/ -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - const op = errors.Op("boltdb_item_requeue") - i.Headers = headers - i.Options.Delay = delay - - tx, err := i.Options.db.Begin(true) - if err != nil { - return errors.E(op, err) - } - - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - err = inQb.Delete(utils.AsBytes(i.ID())) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - // encode the item - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - val := make([]byte, buf.Len()) - copy(val, buf.Bytes()) - buf.Reset() - - if delay > 0 { - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = delayB.Put(utils.AsBytes(tKey), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() - } - - pushB := tx.Bucket(utils.AsBytes(PushBucket)) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = pushB.Put(utils.AsBytes(i.ID()), val) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - return tx.Commit() -} - -func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { - i.Options.db = db - i.Options.active = active - i.Options.delayed = delayed -} - -func (i *Item) rollback(err error, tx *bbolt.Tx) error { - errR := tx.Rollback() - if errR != nil { - return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) - } - return errors.Errorf("transaction commit error: %v", err) -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} |