summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs/consumer.go
blob: a8db2f30026e2ac7827fd51b1cb3bae4149e6b94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package boltjobs

import (
	"context"
	"os"
	"sync/atomic"
	"time"

	"github.com/spiral/errors"
	"github.com/spiral/roadrunner/v2/pkg/events"
	priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
	jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
	"github.com/spiral/roadrunner/v2/plugins/config"
	"github.com/spiral/roadrunner/v2/plugins/jobs/job"
	"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
	"github.com/spiral/roadrunner/v2/plugins/logger"
	"github.com/spiral/roadrunner/v2/utils"
	bolt "go.etcd.io/bbolt"
)

const (
	PluginName = "boltdb"
)

type consumer struct {
	// bbolt configuration
	file        string
	permissions int
	bucket      string
	db          *bolt.DB

	log  logger.Logger
	eh   events.Handler
	pq   priorityqueue.Queue
	pipe atomic.Value
}

func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
	const op = errors.Op("init_boltdb_jobs")

	if !cfg.Has(configKey) {
		return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
	}

	// if no global section
	if !cfg.Has(PluginName) {
		return nil, errors.E(op, errors.Str("no global boltdb configuration"))
	}

	conf := &Config{}

	err := cfg.UnmarshalKey(configKey, conf)
	if err != nil {
		return nil, errors.E(op, err)
	}

	// add default values
	conf.InitDefaults()
	c := &consumer{
		file:        conf.File,
		permissions: conf.Permissions,
		bucket:      conf.bucket,

		log: log,
		eh:  e,
		pq:  pq,
	}

	db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{
		Timeout:        time.Second * 20,
		NoGrowSync:     false,
		NoFreelistSync: false,
		ReadOnly:       false,
		NoSync:         false,
	})

	if err != nil {
		return nil, errors.E(op, err)
	}

	c.db = db

	// create bucket if it does not exist
	// tx.Commit invokes via the db.Update
	err = db.Update(func(tx *bolt.Tx) error {
		const upOp = errors.Op("boltdb_plugin_update")
		_, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket))
		if err != nil {
			return errors.E(op, upOp)
		}
		return nil
	})

	return c, nil
}

func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
	return &consumer{}, nil
}

func (c *consumer) Push(ctx context.Context, job *job.Job) error {
	panic("implement me")
}

func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
	c.pipe.Store(pipeline)
	return nil
}

func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error {
	panic("implement me")
}

func (c *consumer) Stop(ctx context.Context) error {
	panic("implement me")
}

func (c *consumer) Pause(ctx context.Context, pipeline string) {
	panic("implement me")
}

func (c *consumer) Resume(ctx context.Context, pipeline string) {
	panic("implement me")
}

func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
	panic("implement me")
}