summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-09 17:10:07 +0300
committerValery Piashchynski <[email protected]>2021-08-09 17:10:07 +0300
commitfba3d927b62f8963f0c291da2739061e726df32e (patch)
tree37fd54198f1b4939c0b78435c22817ca0c5facd9
parent606e2170ccac5a13a11198aaf54e4219a83291ab (diff)
Update goridge to v3.2.0, update all frames operations.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod6
-rw-r--r--go.sum8
-rwxr-xr-xinternal/protocol.go14
-rwxr-xr-xpkg/pool/static_pool.go1
-rwxr-xr-xpkg/worker/sync_worker.go14
-rw-r--r--plugins/jobs/plugin.go2
-rw-r--r--plugins/jobs/response_protocol.md47
-rw-r--r--tests/env/Dockerfile-elastic-mq.yaml4
8 files changed, 72 insertions, 24 deletions
diff --git a/go.mod b/go.mod
index aea4ce64..5a36dec3 100644
--- a/go.mod
+++ b/go.mod
@@ -26,13 +26,13 @@ require (
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.13.0
github.com/prometheus/client_golang v1.10.0
- github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 // indirect
+ github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/spf13/viper v1.7.1
// SPIRAL ====
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
- github.com/spiral/goridge/v3 v3.1.4
+ github.com/spiral/goridge/v3 v3.2.0
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
@@ -44,6 +44,6 @@ require (
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
- google.golang.org/protobuf v1.26.0
+ google.golang.org/protobuf v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
diff --git a/go.sum b/go.sum
index 912000bf..0b5649c9 100644
--- a/go.sum
+++ b/go.sum
@@ -415,8 +415,8 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h
github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go=
github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
-github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
+github.com/spiral/goridge/v3 v3.2.0 h1:JS0zcOgp1hxMzu2Uc6feKalt78hBLTufGEbtEXdWW2E=
+github.com/spiral/goridge/v3 v3.2.0/go.mod h1:ekGaQYwbWOPVDwIrVxIY9Mwq2/+X/xt5sPsuC+t85Oo=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
@@ -680,8 +680,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
-google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
-google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/internal/protocol.go b/internal/protocol.go
index 7487b4f3..78174118 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -39,8 +39,8 @@ func SendControl(rl relay.Relay, payload interface{}) error {
fr := getFrame()
defer putFrame(fr)
- fr.WriteVersion(frame.VERSION_1)
- fr.WriteFlags(frame.CONTROL)
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
+ fr.WriteFlags(fr.Header(), frame.CONTROL)
if data, ok := payload.([]byte); ok {
// check if payload no more that 4Gb
@@ -48,9 +48,9 @@ func SendControl(rl relay.Relay, payload interface{}) error {
return errors.E(op, errors.Str("payload is more that 4gb"))
}
- fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayloadLen(fr.Header(), uint32(len(data)))
fr.WritePayload(data)
- fr.WriteCRC()
+ fr.WriteCRC(fr.Header())
err := rl.Send(fr)
if err != nil {
@@ -64,9 +64,9 @@ func SendControl(rl relay.Relay, payload interface{}) error {
return errors.E(op, errors.Errorf("invalid payload: %s", err))
}
- fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayloadLen(fr.Header(), uint32(len(data)))
fr.WritePayload(data)
- fr.WriteCRC()
+ fr.WriteCRC(fr.Header())
// hold a pointer to a frame
// Do we need a copy here????
@@ -89,7 +89,7 @@ func FetchPID(rl relay.Relay) (int64, error) {
defer putFrame(fr)
err = rl.Receive(fr)
- if !fr.VerifyCRC() {
+ if !fr.VerifyCRC(fr.Header()) {
return 0, errors.E(op, errors.Str("CRC mismatch"))
}
if err != nil {
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 1cd0a8fa..051e7a8a 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // TODO suspicious logic, redesign
err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index d20b7ae0..74e29b71 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -159,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
defer tw.putFrame(fr)
// can be 0 here
- fr.WriteVersion(frame.VERSION_1)
+ fr.WriteVersion(fr.Header(), frame.VERSION_1)
// obtain a buffer
buf := tw.get()
@@ -168,11 +168,11 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
buf.Write(p.Body)
// Context offset
- fr.WriteOptions(uint32(len(p.Context)))
- fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
+ fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WritePayload(buf.Bytes())
- fr.WriteCRC()
+ fr.WriteCRC(fr.Header())
// return buffer
tw.put(buf)
@@ -193,7 +193,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
- if !frameR.VerifyCRC() {
+ if !frameR.VerifyCRC(frameR.Header()) {
return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
@@ -203,7 +203,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
- options := frameR.ReadOptions()
+ options := frameR.ReadOptions(frameR.Header())
if len(options) != 1 {
return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
@@ -214,7 +214,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err
}
// by copying we free frame's payload slice
- // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool)
+ // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
// https://blog.golang.org/slices-intro#TOC_6.
copy(pld.Body, frameR.Payload()[options[0]:])
copy(pld.Context, frameR.Payload()[:options[0]])
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d2d2ed9f..6c848a9d 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -31,7 +31,7 @@ const (
)
type Plugin struct {
- cfg *Config `mapstructure:"jobs"`
+ cfg *Config `structure:"jobs"`
log logger.Logger
sync.RWMutex
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
new file mode 100644
index 00000000..c1d32e7a
--- /dev/null
+++ b/plugins/jobs/response_protocol.md
@@ -0,0 +1,47 @@
+Response protocol used to communicate between worker and RR. When a worker completes its job, it should send a typed
+response. The response should contain:
+
+1. `type` field with the message type. Can be treated as enums.
+2. `data` field with the dynamic response related to the type.
+
+Types are:
+
+```
+0 - NO_ERROR
+1 - ERROR
+2 - ...
+```
+
+----
+`NO_ERROR`: contains only `type` and empty `data`.
+
+----
+`ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job,
+`dalay_seconds`: to delay a queue for a provided amount of seconds.
+----
+
+For example:
+
+`NO_ERROR`:
+For example:
+
+```json
+{
+ "type": 0,
+ "data": {}
+}
+
+```
+
+`ERROR`:
+
+```json
+{
+ "type": 1,
+ "data": {
+ "message": "internal worker error",
+ "requeue": true,
+ "delay_seconds": 10
+ }
+}
+```
diff --git a/tests/env/Dockerfile-elastic-mq.yaml b/tests/env/Dockerfile-elastic-mq.yaml
index c9f909d0..e1513450 100644
--- a/tests/env/Dockerfile-elastic-mq.yaml
+++ b/tests/env/Dockerfile-elastic-mq.yaml
@@ -1,8 +1,8 @@
FROM openjdk:16
-ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.1.1.jar /
+ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.2.0.jar /
COPY custom.conf /
-ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.1.1.jar"]
+ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.2.0.jar"]
EXPOSE 9324