summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod2
-rw-r--r--go.sum3
-rw-r--r--pkg/worker_watcher/container/channel/vec.go52
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go8
-rw-r--r--plugins/jobs/drivers/amqp/item.go2
-rw-r--r--plugins/jobs/drivers/amqp/listener.go2
-rw-r--r--plugins/jobs/drivers/amqp/redial.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go24
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go8
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go6
-rw-r--r--tests/env/Dockerfile-beanstalkd.yaml8
-rw-r--r--tests/env/docker-compose.yaml9
13 files changed, 91 insertions, 43 deletions
diff --git a/go.mod b/go.mod
index dd590539..aea4ce64 100644
--- a/go.mod
+++ b/go.mod
@@ -26,13 +26,13 @@ require (
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.13.0
github.com/prometheus/client_golang v1.10.0
+ github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/spf13/viper v1.7.1
// SPIRAL ====
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.4
- github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index 25783fcc..912000bf 100644
--- a/go.sum
+++ b/go.sum
@@ -373,6 +373,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 h1:D5EMs8zL77qXFJ60vl7x5xRxtezkXsmr8mwypRk5Pe4=
+github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -416,7 +418,6 @@ github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdU
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
-github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index eafbfb07..c06d05b0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -2,6 +2,7 @@ package channel
import (
"context"
+ "sync"
"sync/atomic"
"github.com/spiral/errors"
@@ -9,21 +10,62 @@ import (
)
type Vec struct {
+ sync.RWMutex
+ // destroy signal
destroy uint64
+ // channel with the workers
workers chan worker.BaseProcess
+
+ len uint64
}
-func NewVector(initialNumOfWorkers uint64) *Vec {
+func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ len: len,
+ workers: make(chan worker.BaseProcess, len),
}
return vec
}
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case
func (v *Vec) Push(w worker.BaseProcess) {
- v.workers <- w
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := uint64(0); i < v.len; i++ {
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ default:
+ // kill the current worker (just to be sure it's dead)
+ _ = wrk.Kill()
+ // replace with the new one
+ v.workers <- w
+ return
+ }
+ }
+ }
}
func (v *Vec) Remove(_ int64) {}
@@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
return nil, errors.E(errors.WatcherStopped)
}
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
select {
case w := <-v.workers:
return w, nil
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index ca026383..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -29,7 +29,7 @@ type Vector interface {
type workerWatcher struct {
sync.RWMutex
container Vector
- // used to control the Destroy stage (that all workers are in the container)
+ // used to control Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
+ // set state as stopped
w.State().Set(worker.StateStopped)
- ww.Remove(w)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 714a714a..3ca5c742 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -15,7 +16,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/streadway/amqp"
)
type JobConsumer struct {
@@ -325,7 +325,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -375,7 +375,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -413,7 +413,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 295ccfd3..6b544620 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -4,10 +4,10 @@ import (
"time"
json "github.com/json-iterator/go"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
- "github.com/streadway/amqp"
)
type Item struct {
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
index 8011aa3b..0b1cd2dc 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -1,6 +1,6 @@
package amqp
-import "github.com/streadway/amqp"
+import amqp "github.com/rabbitmq/amqp091-go"
func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
go func() {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index fd19f1ce..ef2a130a 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -4,10 +4,10 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/streadway/amqp"
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 797b4821..ae223f39 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -55,14 +55,16 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger.
}, nil
}
-func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+// Put the payload
+// TODO use the context ??
+func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
} else {
@@ -81,14 +83,14 @@ func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) {
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
} else {
@@ -107,7 +109,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return errN
} else {
@@ -118,12 +120,14 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
-func (cp *ConnPool) redial(ctx context.Context) error {
+func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
cp.Lock()
// backoff here
- expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
+ expb := backoff.NewExponentialBackOff()
+ // TODO set via config
+ expb.MaxElapsedTime = time.Minute
operation := func() error {
connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
@@ -164,7 +168,7 @@ func (cp *ConnPool) redial(ctx context.Context) error {
var connErrors = map[string]struct{}{"EOF": {}}
-func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
+func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
switch et := err.(type) { //nolint:gocritic
// check if the error
@@ -172,7 +176,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
switch bErr := et.Err.(type) {
case *net.OpError:
cp.RUnlock()
- errR := cp.redial(ctx)
+ errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
@@ -185,7 +189,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
if _, ok := connErrors[et.Err.Error()]; ok {
// if error is related to the broken connection - redial
cp.RUnlock()
- errR := cp.redial(ctx)
+ errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 90da3801..54c8318b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- go j.listen(ctx)
+ go j.listen()
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
@@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) {
}
// start listener
- go j.listen(ctx)
+ go j.listen()
// increase num of listeners
atomic.AddUint32(&j.listeners, 1)
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index b872cbd4..aaf635b1 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,19 +1,17 @@
package beanstalk
import (
- "context"
-
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen(ctx context.Context) {
+func (j *JobConsumer) listen() {
for {
select {
case <-j.stopCh:
j.log.Warn("beanstalk listener stopped")
return
default:
- id, body, err := j.pool.Reserve(ctx, j.reserveTimeout)
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
if errB, ok := err.(beanstalk.ConnError); ok {
switch errB.Err { //nolint:gocritic
diff --git a/tests/env/Dockerfile-beanstalkd.yaml b/tests/env/Dockerfile-beanstalkd.yaml
index 7b36f8d3..852385a1 100644
--- a/tests/env/Dockerfile-beanstalkd.yaml
+++ b/tests/env/Dockerfile-beanstalkd.yaml
@@ -1,13 +1,15 @@
-FROM ubuntu:latest
+FROM archlinux:latest
ARG DEBIAN_FRONTEND=noninteractive
+RUN pacman-key --init
-RUN apt-get update && apt-get install -y curl build-essential pkg-config
+RUN Y | pacman -Syu --noconfirm
+RUN Y | pacman -S --noconfirm curl base-devel pkgconf
RUN curl -sL https://github.com/kr/beanstalkd/archive/v1.12.tar.gz | tar xvz -C /tmp
WORKDIR /tmp/beanstalkd-1.12
-RUN make
+RUN make -j12
RUN cp beanstalkd /usr/bin
EXPOSE 11300
diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml
index 2573fe7a..dc91e7b6 100644
--- a/tests/env/docker-compose.yaml
+++ b/tests/env/docker-compose.yaml
@@ -1,4 +1,4 @@
-version: '3'
+version: '3.8'
services:
memcached:
@@ -16,9 +16,6 @@ services:
toxicproxy:
image: shopify/toxiproxy
-# ports:
-# - "8474:8474"
-# - "5673:5673"
network_mode: "host"
beanstalk:
@@ -37,10 +34,6 @@ services:
rabbitmq:
image: rabbitmq:3-management
- environment:
- RABBITMQ_DEFAULT_USER: guest
- RABBITMQ_DEFAULT_PASS: guest
- RABBITMQ_DEFAULT_VHOST: /
ports:
- "15672:15672"
- "5672:5672"