summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/message_queue.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 11:52:03 +0300
committerGitHub <[email protected]>2021-01-26 11:52:03 +0300
commite2266b80db47444ba5858c736833a8a81b1361ad (patch)
tree37e06810352752f88032f7d0eadb554fa18b98da /plugins/temporal/workflow/message_queue.go
parentfae4711e3548bfd2e34f13aabfaab6a5b4e317c6 (diff)
parenta392d962508e1bc9e497c8c4ef021425bc2c67c2 (diff)
Merge pull request #502 from spiral/plugin/temporalv2.0.0-beta12
plugin(temporal): Add temporal plugins set to the RR2
Diffstat (limited to 'plugins/temporal/workflow/message_queue.go')
-rw-r--r--plugins/temporal/workflow/message_queue.go47
1 files changed, 47 insertions, 0 deletions
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go
new file mode 100644
index 00000000..8f4409d1
--- /dev/null
+++ b/plugins/temporal/workflow/message_queue.go
@@ -0,0 +1,47 @@
+package workflow
+
+import (
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+type messageQueue struct {
+ seqID func() uint64
+ queue []rrt.Message
+}
+
+func newMessageQueue(sedID func() uint64) *messageQueue {
+ return &messageQueue{
+ seqID: sedID,
+ queue: make([]rrt.Message, 0, 5),
+ }
+}
+
+func (mq *messageQueue) flush() {
+ mq.queue = mq.queue[0:0]
+}
+
+func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) {
+ msg := rrt.Message{
+ ID: mq.seqID(),
+ Command: cmd,
+ Payloads: payloads,
+ }
+
+ return msg.ID, msg
+}
+
+func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 {
+ id, msg := mq.allocateMessage(cmd, payloads)
+ mq.queue = append(mq.queue, msg)
+ return id
+}
+
+func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Payloads: payloads})
+}
+
+func (mq *messageQueue) pushError(id uint64, failure *failure.Failure) {
+ mq.queue = append(mq.queue, rrt.Message{ID: id, Failure: failure})
+}