diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/item.go')
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go new file mode 100644 index 00000000..837f8c63 --- /dev/null +++ b/plugins/boltdb/boltjobs/item.go @@ -0,0 +1,229 @@ +package boltjobs + +import ( + "bytes" + "encoding/gob" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" + "go.etcd.io/bbolt" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + db *bbolt.DB + active *uint64 + delayed *uint64 +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + const op = errors.Op("boltdb_item_ack") + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + if i.Options.Delay > 0 { + atomic.AddUint64(i.Options.delayed, ^uint64(0)) + } else { + atomic.AddUint64(i.Options.active, ^uint64(0)) + } + + return tx.Commit() +} + +func (i *Item) Nack() error { + const op = errors.Op("boltdb_item_ack") + /* + steps: + 1. begin tx + 2. get item by ID from the InQueueBucket (previously put in the listener) + 3. put it back to the PushBucket + 4. Delete it from the InQueueBucket + */ + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + v := inQb.Get(utils.AsBytes(i.ID())) + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + err = pushB.Put(utils.AsBytes(i.ID()), v) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + return tx.Commit() +} + +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + const op = errors.Op("boltdb_item_requeue") + i.Headers = headers + i.Options.Delay = delay + + tx, err := i.Options.db.Begin(true) + if err != nil { + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + + if delay > 0 { + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) + + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = delayB.Put(utils.AsBytes(tKey), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() + } + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = pushB.Put(utils.AsBytes(i.ID()), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() +} + +func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { + i.Options.db = db + i.Options.active = active + i.Options.delayed = delayed +} + +func (i *Item) rollback(err error, tx *bbolt.Tx) error { + errR := tx.Rollback() + if errR != nil { + return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) + } + return errors.Errorf("transaction commit error: %v", err) +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} |