From 521aeb823bc8fa1f0a91b540cbbac96328185f51 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 23 Jun 2021 17:41:51 +0300 Subject: - Add PQ (priority_queue) mock - Add binary heap mock - Connect first sub-plugin (ephemeral) with root jobs plugin Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 pkg/priority_queue/interface.go (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go new file mode 100644 index 00000000..d1c3229f --- /dev/null +++ b/pkg/priority_queue/interface.go @@ -0,0 +1,7 @@ +package priorityqueue + +type Queue interface { + Push() + Pop() + BLPop() +} -- cgit v1.2.3 From ad1ca84b26bb6a4ba410a8a684fe3d2e2f86eaea Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 25 Jun 2021 14:21:53 +0300 Subject: - Update jobs interface Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index d1c3229f..5945a013 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -2,6 +2,6 @@ package priorityqueue type Queue interface { Push() - Pop() + Pop() interface{} BLPop() } -- cgit v1.2.3 From 2ac3b240b118961c1a30cc18dd22d08b7fac6516 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 30 Jun 2021 11:08:40 +0300 Subject: - Update arch diagrams - Update ephemeral plugin Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 5945a013..00998d78 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,7 +1,6 @@ package priorityqueue type Queue interface { - Push() + Push(item interface{}) Pop() interface{} - BLPop() } -- cgit v1.2.3 From 677db79f76fcc566bee2b1b51d0f40a0c9f2ac19 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 3 Jul 2021 15:19:48 +0300 Subject: - Initial binary_heap Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 00998d78..45430486 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,6 +1,11 @@ package priorityqueue type Queue interface { - Push(item interface{}) - Pop() interface{} + Push(item PQItem) + Pop() PQItem +} + +type PQItem interface { + ID() string + Priority() uint64 } -- cgit v1.2.3 From 1dd43e378c55b6984bda6c2e8b048d2ed821aa43 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 5 Jul 2021 11:52:03 +0300 Subject: - Finish binary_heap Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 45430486..3cc1d575 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,8 +1,8 @@ package priorityqueue type Queue interface { - Push(item PQItem) - Pop() PQItem + Insert(item PQItem) + GetMax() PQItem } type PQItem interface { -- cgit v1.2.3 From 207739f7346c98e16087547bc510e1f909671260 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 5 Jul 2021 18:44:29 +0300 Subject: - Update PQ - Update ephemeral plugin, complete Push - Add Jobs full configuration Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100644 pkg/priority_queue/interface.go (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go deleted file mode 100644 index 3cc1d575..00000000 --- a/pkg/priority_queue/interface.go +++ /dev/null @@ -1,11 +0,0 @@ -package priorityqueue - -type Queue interface { - Insert(item PQItem) - GetMax() PQItem -} - -type PQItem interface { - ID() string - Priority() uint64 -} -- cgit v1.2.3 From 589f759cc2411319adbca2ece0dbe212407d1eba Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 11 Jul 2021 10:11:22 +0300 Subject: Update informer interface to return slice of pointers (do not over-copy the Stat structure). Make amqp Push concurrent safe. Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 pkg/priority_queue/interface.go (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go new file mode 100644 index 00000000..8278dc8d --- /dev/null +++ b/pkg/priority_queue/interface.go @@ -0,0 +1,28 @@ +package priorityqueue + +type Queue interface { + Insert(item Item) + GetMax() Item + Len() uint64 +} + +// Item represents binary heap item +type Item interface { + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() uint64 + + // Body is the Item payload + Body() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing + Ack() error + + // Nack - discard the Item + Nack() error +} -- cgit v1.2.3 From 0f70f1e2311640236d74a0a237536779d8d44223 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 11 Jul 2021 19:54:35 +0300 Subject: Update JOBS interface, Renamed Consume -> Run. Add DYNAMIC declaration of the pipelines. Update Jobs constructor interface, add FromPipeline method to construct jobs driver (unique) via the `Declare` RPC call. Add `Stop` method to gracefully stop all consumers. Binary heaps `GetMax` to canonical `ExtractMin`. Other small improvements. Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 8278dc8d..1efebf1c 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -2,7 +2,7 @@ package priorityqueue type Queue interface { Insert(item Item) - GetMax() Item + ExtractMin() Item Len() uint64 } -- cgit v1.2.3 From d099e47ab28dd044d34e18347a4c714b8af3d612 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 11:35:12 +0300 Subject: SQS driver. Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 1efebf1c..d64aaf3d 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -12,7 +12,7 @@ type Item interface { ID() string // Priority returns the Item's priority to sort - Priority() uint64 + Priority() int64 // Body is the Item payload Body() []byte -- cgit v1.2.3 From d379c28a1e9babead0266bc4fa10d6c5e7aa14cb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 9 Aug 2021 23:26:24 +0300 Subject: Add initial support for the php worker protocol. Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index d64aaf3d..eee2a090 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -25,4 +25,7 @@ type Item interface { // Nack - discard the Item Nack() error + + // Requeue - put the message back to the queue with the optional delay + Requeue(delay uint32) error } -- cgit v1.2.3 From a8a7f4194156440ef3157d8e5d75c43ed0327bcf Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 10 Aug 2021 19:54:03 +0300 Subject: Add jobs protocol support for the AMQP driver Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index eee2a090..3d192e8a 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -27,5 +27,5 @@ type Item interface { Nack() error // Requeue - put the message back to the queue with the optional delay - Requeue(delay uint32) error + Requeue(delay int64) error } -- cgit v1.2.3 From de37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 13:42:33 +0300 Subject: Add headers support to the jobs protocol Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 3d192e8a..9efa4652 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -27,5 +27,5 @@ type Item interface { Nack() error // Requeue - put the message back to the queue with the optional delay - Requeue(delay int64) error + Requeue(headers map[string][]string, delay int64) error } -- cgit v1.2.3 From 2d460062c97f9ad1e793831c54ae4d177dea83e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 22:03:34 +0300 Subject: Durable requeue algo. Update AMQP and Beanstalk tests to use mock logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 9efa4652..0034cbd3 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -28,4 +28,7 @@ type Item interface { // Requeue - put the message back to the queue with the optional delay Requeue(headers map[string][]string, delay int64) error + + // Recycle frees resources allocated by the Item + Recycle() } -- cgit v1.2.3 From ecbfc5c5265a9895f4e371ce4388f64df8714e63 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 12 Aug 2021 13:25:36 +0300 Subject: Remove unneeded options, complete tests for the ephemeral, update proto Signed-off-by: Valery Piashchynski --- pkg/priority_queue/interface.go | 3 --- 1 file changed, 3 deletions(-) (limited to 'pkg/priority_queue/interface.go') diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 0034cbd3..9efa4652 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -28,7 +28,4 @@ type Item interface { // Requeue - put the message back to the queue with the optional delay Requeue(headers map[string][]string, delay int64) error - - // Recycle frees resources allocated by the Item - Recycle() } -- cgit v1.2.3