diff options
Diffstat (limited to 'plugins/beanstalk/item.go')
-rw-r--r-- | plugins/beanstalk/item.go | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go deleted file mode 100644 index 03060994..00000000 --- a/plugins/beanstalk/item.go +++ /dev/null @@ -1,138 +0,0 @@ -package beanstalk - -import ( - "bytes" - "context" - "encoding/gob" - "time" - - "github.com/beanstalkd/go-beanstalk" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // Private ================ - id uint64 - conn *beanstalk.Conn - requeueFn func(context.Context, *Item) error -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the sqs, MessageAttributes used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Nack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - // delete old job - err = i.Options.conn.Delete(i.Options.id) - if err != nil { - return err - } - - return nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -func (j *consumer) unpack(id uint64, data []byte, out *Item) error { - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) - if err != nil { - return err - } - out.Options.conn = j.pool.conn - out.Options.id = id - out.Options.requeueFn = j.handleItem - - return nil -} |