diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 11:52:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-26 11:52:03 +0300 |
commit | e2266b80db47444ba5858c736833a8a81b1361ad (patch) | |
tree | 37e06810352752f88032f7d0eadb554fa18b98da /plugins/temporal/workflow/message_queue.go | |
parent | fae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff) | |
parent | a392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff) |
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
Diffstat (limited to 'plugins/temporal/workflow/message_queue.go')
-rw-r--r-- | plugins/temporal/workflow/message_queue.go | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go new file mode 100644 index 00000000..8f4409d1 --- /dev/null +++ b/plugins/temporal/workflow/message_queue.go @@ -0,0 +1,47 @@ +package workflow + +import ( + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +type messageQueue struct { + seqID func() uint64 + queue []rrt.Message +} + +func newMessageQueue(sedID func() uint64) *messageQueue { + return &messageQueue{ + seqID: sedID, + queue: make([]rrt.Message, 0, 5), + } +} + +func (mq *messageQueue) flush() { + mq.queue = mq.queue[0:0] +} + +func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { + msg := rrt.Message{ + ID: mq.seqID(), + Command: cmd, + Payloads: payloads, + } + + return msg.ID, msg +} + +func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { + id, msg := mq.allocateMessage(cmd, payloads) + mq.queue = append(mq.queue, msg) + return id +} + +func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads}) +} + +func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) { + mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure}) +} |