summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/interface.go2
-rw-r--r--plugins/broadcast/plugin.go2
-rw-r--r--plugins/broadcast/rpc.go2
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/plugin.go9
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go9
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go9
-rw-r--r--plugins/kv/interface.go36
-rw-r--r--plugins/kv/plugin.go12
-rw-r--r--plugins/kv/rpc.go3
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go5
-rw-r--r--plugins/memory/pubsub.go2
-rw-r--r--plugins/redis/channel.go2
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go4
-rw-r--r--plugins/redis/pubsub.go2
-rw-r--r--plugins/server/plugin.go27
-rw-r--r--plugins/websockets/executor/executor.go2
-rw-r--r--plugins/websockets/plugin.go2
-rw-r--r--plugins/websockets/pool/workers_pool.go2
23 files changed, 60 insertions, 82 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 46709d71..eda3572f 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,6 +1,6 @@
package broadcast
-import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+import "github.com/spiral/roadrunner/v2/common/pubsub"
type Broadcaster interface {
GetDriver(key string) (pubsub.SubReader, error)
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 6ddef806..889dc2fa 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 2ee211f8..475076a0 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,7 +2,7 @@ package broadcast
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index a1c1532c..0639f448 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-22T07:34:31.801Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="W3ry_giS-Ii3LUJqzQzg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfFr7mIdvTKRLwd1Es266nhqh+DFhIY7mED+geInwBYmtnd9oBa4y0WX3pMi8rVV9f1a/+Bw==</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-22T08:56:50.739Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="Kdvb2D1nWMjMRedS8I3V" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfaoPfkO3pFAn4uyiWbddTQ1Q/BiyksVzCB3QPEb4AsbWfZTrkASdtVl+6zMtK1ddX9av/AQ==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index cda2a711..bd5ff5bf 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,6 +2,7 @@ package jobs
import (
"context"
+ "fmt"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
@@ -12,6 +13,8 @@ import (
)
const (
+ // RrJobs env variable
+ RrJobs string = "rr_jobs"
PluginName string = "jobs"
)
@@ -24,6 +27,10 @@ type Plugin struct {
consumers map[string]Consumer
}
+func testListener(data interface{}) {
+ fmt.Println(data)
+}
+
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
@@ -35,7 +42,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, nil, nil)
+ p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 4b675271..e5aac290 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,8 +9,8 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 6ae1a1f6..c839130f 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -2,12 +2,15 @@ package boltdb
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "boltdb"
+const (
+ PluginName string = "boltdb"
+ RootPluginName string = "kv"
+)
// Plugin BoltDB K/V storage.
type Plugin struct {
@@ -21,7 +24,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index a2787d72..520ec7d5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,8 +6,8 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 22ea5cca..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -2,12 +2,15 @@ package memcached
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "memcached"
+const (
+ PluginName string = "memcached"
+ RootPluginName string = "kv"
+)
type Plugin struct {
// config plugin
@@ -17,7 +20,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
deleted file mode 100644
index ffdbbe62..00000000
--- a/plugins/kv/interface.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package kv
-
-import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
-
-// Storage represents single abstract storage.
-type Storage interface {
- // Has checks if value exists.
- Has(keys ...string) (map[string]bool, error)
-
- // Get loads value content into a byte slice.
- Get(key string) ([]byte, error)
-
- // MGet loads content of multiple values
- // Returns the map with existing keys and associated values
- MGet(keys ...string) (map[string][]byte, error)
-
- // Set used to upload item to KV with TTL
- // 0 value in TTL means no TTL
- Set(items ...*kvv1.Item) error
-
- // MExpire sets the TTL for multiply keys
- MExpire(items ...*kvv1.Item) error
-
- // TTL return the rest time to live for provided keys
- // Not supported for the memcached and boltdb
- TTL(keys ...string) (map[string]string, error)
-
- // Delete one or multiple keys.
- Delete(keys ...string) error
-}
-
-// Constructor provides storage based on the config
-type Constructor interface {
- // KVConstruct provides Storage based on the config key
- KVConstruct(key string) (Storage, error)
-}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 03dbaed6..e9ea25df 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -5,10 +5,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
const PluginName string = "kv"
const (
@@ -25,9 +27,9 @@ const (
type Plugin struct {
log logger.Logger
// constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
- constructors map[string]Constructor
+ constructors map[string]kv.Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
- storages map[string]Storage
+ storages map[string]kv.Storage
// KV configuration
cfg Config
cfgPlugin config.Configurer
@@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.constructors = make(map[string]Constructor, 5)
- p.storages = make(map[string]Storage, 5)
+ p.constructors = make(map[string]kv.Constructor, 5)
+ p.storages = make(map[string]kv.Storage, 5)
p.log = log
p.cfgPlugin = cfg
return nil
@@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) {
// save the storage constructor
p.constructors[name.Name()] = constructor
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index af763600..b9b302fe 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,6 +2,7 @@ package kv
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -9,7 +10,7 @@ import (
// Wrapper for the plugin
type rpc struct {
// all available storages
- storages map[string]Storage
+ storages map[string]kv.Storage
// svc is a plugin implementing Storage interface
srv *Plugin
// Logger
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 1cf031d1..1906e4fd 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,8 +6,8 @@ import (
"time"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 70badf15..7d418a70 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,9 +2,9 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,6 @@ type Plugin struct {
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.log = log
p.cfgPlugin = cfg
p.stop = make(chan struct{}, 1)
return nil
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index d027a8a5..3c909900 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -3,8 +3,8 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
index 5817853c..0cd62d19 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/channel.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 320b7443..378d7630 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,8 +7,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 9d98790b..3c62a63f 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,9 +5,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 4e41acb5..8bd78514 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 00639f43..e2fa0086 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
const op = errors.Op("server_plugin_new_worker")
list := make([]events.Listener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
+ list = append(list, server.collectWorkerEvents)
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -147,8 +147,8 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En
return nil, errors.E(op, err)
}
- list := make([]events.Listener, 0, 1)
- list = append(list, server.collectEvents)
+ list := make([]events.Listener, 0, 22)
+ list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
if len(listeners) != 0 {
list = append(list, listeners...)
}
@@ -209,7 +209,7 @@ func (server *Plugin) setEnv(e Env) []string {
return env
}
-func (server *Plugin) collectEvents(event interface{}) {
+func (server *Plugin) collectPoolEvents(event interface{}) {
if we, ok := event.(events.PoolEvent); ok {
switch we.Event {
case events.EventMaxMemory:
@@ -238,7 +238,9 @@ func (server *Plugin) collectEvents(event interface{}) {
server.log.Warn("requested pool restart")
}
}
+}
+func (server *Plugin) collectWorkerEvents(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
@@ -264,16 +266,13 @@ func (server *Plugin) collectEvents(event interface{}) {
}
}
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
- case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- // stderr event is INFO level
- case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
+func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventJobStart:
+ server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed)
}
}
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 664b4dfd..c1f79a78 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,7 +7,7 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ca5f2f59..c9a31613 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 752ba3ce..758620f6 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,7 @@ import (
"sync"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"