diff options
author | Valery Piashchynski <[email protected]> | 2021-08-09 17:10:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-09 17:10:07 +0300 |
commit | fba3d927b62f8963f0c291da2739061e726df32e (patch) | |
tree | 37fd54198f1b4939c0b78435c22817ca0c5facd9 | |
parent | 606e2170ccac5a13a11198aaf54e4219a83291ab (diff) |
Update goridge to v3.2.0, update all frames operations.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | go.mod | 6 | ||||
-rw-r--r-- | go.sum | 8 | ||||
-rwxr-xr-x | internal/protocol.go | 14 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 1 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 14 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 47 | ||||
-rw-r--r-- | tests/env/Dockerfile-elastic-mq.yaml | 4 |
8 files changed, 72 insertions, 24 deletions
@@ -26,13 +26,13 @@ require ( github.com/json-iterator/go v1.1.11 github.com/klauspost/compress v1.13.0 github.com/prometheus/client_golang v1.10.0 - github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 // indirect + github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 github.com/shirou/gopsutil v3.21.3+incompatible github.com/spf13/viper v1.7.1 // SPIRAL ==== github.com/spiral/endure v1.0.2 github.com/spiral/errors v1.0.11 - github.com/spiral/goridge/v3 v3.1.4 + github.com/spiral/goridge/v3 v3.2.0 // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.6 // indirect @@ -44,6 +44,6 @@ require ( golang.org/x/net v0.0.0-20210226101413-39120d07d75e golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 - google.golang.org/protobuf v1.26.0 + google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -415,8 +415,8 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go= github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40= -github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk= +github.com/spiral/goridge/v3 v3.2.0 h1:JS0zcOgp1hxMzu2Uc6feKalt78hBLTufGEbtEXdWW2E= +github.com/spiral/goridge/v3 v3.2.0/go.mod h1:ekGaQYwbWOPVDwIrVxIY9Mwq2/+X/xt5sPsuC+t85Oo= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -680,8 +680,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/protocol.go b/internal/protocol.go index 7487b4f3..78174118 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -39,8 +39,8 @@ func SendControl(rl relay.Relay, payload interface{}) error { fr := getFrame() defer putFrame(fr) - fr.WriteVersion(frame.VERSION_1) - fr.WriteFlags(frame.CONTROL) + fr.WriteVersion(fr.Header(), frame.VERSION_1) + fr.WriteFlags(fr.Header(), frame.CONTROL) if data, ok := payload.([]byte); ok { // check if payload no more that 4Gb @@ -48,9 +48,9 @@ func SendControl(rl relay.Relay, payload interface{}) error { return errors.E(op, errors.Str("payload is more that 4gb")) } - fr.WritePayloadLen(uint32(len(data))) + fr.WritePayloadLen(fr.Header(), uint32(len(data))) fr.WritePayload(data) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) err := rl.Send(fr) if err != nil { @@ -64,9 +64,9 @@ func SendControl(rl relay.Relay, payload interface{}) error { return errors.E(op, errors.Errorf("invalid payload: %s", err)) } - fr.WritePayloadLen(uint32(len(data))) + fr.WritePayloadLen(fr.Header(), uint32(len(data))) fr.WritePayload(data) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) // hold a pointer to a frame // Do we need a copy here???? @@ -89,7 +89,7 @@ func FetchPID(rl relay.Relay) (int64, error) { defer putFrame(fr) err = rl.Receive(fr) - if !fr.VerifyCRC() { + if !fr.VerifyCRC(fr.Header()) { return 0, errors.E(op, errors.Str("CRC mismatch")) } if err != nil { diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 1cd0a8fa..051e7a8a 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // TODO suspicious logic, redesign err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index d20b7ae0..74e29b71 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -159,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err defer tw.putFrame(fr) // can be 0 here - fr.WriteVersion(frame.VERSION_1) + fr.WriteVersion(fr.Header(), frame.VERSION_1) // obtain a buffer buf := tw.get() @@ -168,11 +168,11 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err buf.Write(p.Body) // Context offset - fr.WriteOptions(uint32(len(p.Context))) - fr.WritePayloadLen(uint32(buf.Len())) + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) fr.WritePayload(buf.Bytes()) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) // return buffer tw.put(buf) @@ -193,7 +193,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } - if !frameR.VerifyCRC() { + if !frameR.VerifyCRC(frameR.Header()) { return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } @@ -203,7 +203,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } - options := frameR.ReadOptions() + options := frameR.ReadOptions(frameR.Header()) if len(options) != 1 { return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } @@ -214,7 +214,7 @@ func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, err } // by copying we free frame's payload slice - // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) // https://blog.golang.org/slices-intro#TOC_6. copy(pld.Body, frameR.Payload()[options[0]:]) copy(pld.Context, frameR.Payload()[:options[0]]) diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d2d2ed9f..6c848a9d 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -31,7 +31,7 @@ const ( ) type Plugin struct { - cfg *Config `mapstructure:"jobs"` + cfg *Config `structure:"jobs"` log logger.Logger sync.RWMutex diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md new file mode 100644 index 00000000..c1d32e7a --- /dev/null +++ b/plugins/jobs/response_protocol.md @@ -0,0 +1,47 @@ +Response protocol used to communicate between worker and RR. When a worker completes its job, it should send a typed +response. The response should contain: + +1. `type` field with the message type. Can be treated as enums. +2. `data` field with the dynamic response related to the type. + +Types are: + +``` +0 - NO_ERROR +1 - ERROR +2 - ... +``` + +---- +`NO_ERROR`: contains only `type` and empty `data`. + +---- +`ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, +`dalay_seconds`: to delay a queue for a provided amount of seconds. +---- + +For example: + +`NO_ERROR`: +For example: + +```json +{ + "type": 0, + "data": {} +} + +``` + +`ERROR`: + +```json +{ + "type": 1, + "data": { + "message": "internal worker error", + "requeue": true, + "delay_seconds": 10 + } +} +``` diff --git a/tests/env/Dockerfile-elastic-mq.yaml b/tests/env/Dockerfile-elastic-mq.yaml index c9f909d0..e1513450 100644 --- a/tests/env/Dockerfile-elastic-mq.yaml +++ b/tests/env/Dockerfile-elastic-mq.yaml @@ -1,8 +1,8 @@ FROM openjdk:16 -ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.1.1.jar / +ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.2.0.jar / COPY custom.conf / -ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.1.1.jar"] +ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.2.0.jar"] EXPOSE 9324 |