summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.golangci.yml1
-rw-r--r--go.mod2
-rw-r--r--go.sum16
-rw-r--r--plugins/jobs/plugin.go62
4 files changed, 38 insertions, 43 deletions
diff --git a/.golangci.yml b/.golangci.yml
index 55186659..f6ead63e 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -74,7 +74,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- structcheck # Finds unused struct fields
- stylecheck # Stylecheck is a replacement for golint
- tparallel # detects inappropriate usage of t.Parallel() method in your Go test codes
- - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code
- unconvert # Remove unnecessary type conversions
- unused # Checks Go code for unused constants, variables, functions and types
- varcheck # Finds unused global variables and constants
diff --git a/go.mod b/go.mod
index f75b7702..85421a96 100644
--- a/go.mod
+++ b/go.mod
@@ -27,7 +27,7 @@ require (
github.com/shirou/gopsutil v3.21.8+incompatible
github.com/spf13/viper v1.8.1
// SPIRAL ====
- github.com/spiral/endure v1.0.3
+ github.com/spiral/endure v1.0.4
github.com/spiral/errors v1.0.12
github.com/spiral/goridge/v3 v3.2.1
// ===========
diff --git a/go.sum b/go.sum
index a587942b..e144019b 100644
--- a/go.sum
+++ b/go.sum
@@ -49,10 +49,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
-github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
-github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
-github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k=
-github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM=
@@ -358,8 +354,6 @@ github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVz
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o=
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
-github.com/shirou/gopsutil v3.21.7+incompatible h1:g/wcPHcuCQvHSePVofjQljd2vX4ty0+J6VoMB+NPcdk=
-github.com/shirou/gopsutil v3.21.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU=
github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
@@ -381,8 +375,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
-github.com/spiral/endure v1.0.3 h1:07E4MkwHMOJyjotHlTq56SsEK0aCsVslkzR106aj9hk=
-github.com/spiral/endure v1.0.3/go.mod h1:b2hAQBpsyuDL3LDg2dLTs2htYhlY+hLwBgGE075B6yU=
+github.com/spiral/endure v1.0.4 h1:qpProWUVuu6fRceMnIHs9SkpkjlzAxPl7UxSH6zUPDo=
+github.com/spiral/endure v1.0.4/go.mod h1:I9IoSCMtqXVmXX0TQ3Gu72Z1uIDVNKlhKXmcCoqnR/w=
github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s=
github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA=
github.com/spiral/goridge/v3 v3.2.1 h1:5IJofcvWYjAy+X5XevOhwf/8F0i0Bu/baPsBGiSgqzU=
@@ -421,9 +415,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
-github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
-github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
-github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
@@ -569,7 +560,6 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -624,8 +614,6 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
-golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c=
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index f61092a9..eb273b93 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -41,7 +41,7 @@ type Plugin struct {
server server.Server
jobConstructors map[string]jobs.Constructor
- consumers map[string]jobs.Consumer
+ consumers sync.Map // map[string]jobs.Consumer
// events handler
events events.Handler
@@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events.AddListener(p.collectJobsEvents)
p.jobConstructors = make(map[string]jobs.Constructor)
- p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
@@ -142,7 +141,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
// add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[name] = initializedDriver
+ p.consumers.Store(name, initializedDriver)
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipe)
@@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
- for k, v := range p.consumers {
+ // range over all consumers and call stop
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := v.Stop(ctx)
+ err := consumer.Stop(ctx)
if err != nil {
cancel()
- p.log.Error("stop job driver", "driver", k)
- continue
+ p.log.Error("stop job driver", "driver", key)
+ return true
}
cancel()
- }
+ return true
+ })
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
@@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State {
func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
const op = errors.Op("jobs_plugin_drivers_state")
- jst := make([]*jobState.State, 0, len(p.consumers))
- for k := range p.consumers {
- d := p.consumers[k]
+ jst := make([]*jobState.State, 0, 2)
+ var err error
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
- state, err := d.State(newCtx)
+
+ var state *jobState.State
+ state, err = consumer.State(newCtx)
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return false
}
jst = append(jst, state)
cancel()
+ return true
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
}
return jst, nil
}
@@ -449,7 +459,7 @@ func (p *Plugin) Push(j *job.Job) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
@@ -462,7 +472,7 @@ func (p *Plugin) Push(j *job.Job) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
- err := d.Push(ctx, j)
+ err := d.(jobs.Consumer).Push(ctx, j)
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventPushError,
@@ -502,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
@@ -513,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.Push(ctx, j[i])
+ err := d.(jobs.Consumer).Push(ctx, j[i])
if err != nil {
cancel()
p.events.Push(events.JobEvent{
@@ -543,7 +553,7 @@ func (p *Plugin) Pause(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -551,7 +561,7 @@ func (p *Plugin) Pause(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Pause(ctx, ppl.Name())
+ d.(jobs.Consumer).Pause(ctx, ppl.Name())
}
func (p *Plugin) Resume(pp string) {
@@ -562,7 +572,7 @@ func (p *Plugin) Resume(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -571,7 +581,7 @@ func (p *Plugin) Resume(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Resume(ctx, ppl.Name())
+ d.(jobs.Consumer).Resume(ctx, ppl.Name())
}
// Declare a pipeline.
@@ -593,7 +603,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
}
// add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[pipeline.Name()] = initializedDriver
+ p.consumers.Store(pipeline.Name(), initializedDriver)
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipeline)
@@ -630,24 +640,22 @@ func (p *Plugin) Destroy(pp string) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ // delete consumer
+ d, ok := p.consumers.LoadAndDelete(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
- // delete consumer
- delete(p.consumers, ppl.Name())
// delete old pipeline
p.pipelines.LoadAndDelete(pp)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.Stop(ctx)
+ err := d.(jobs.Consumer).Stop(ctx)
if err != nil {
cancel()
return errors.E(op, err)
}
- d = nil
cancel()
return nil
}