diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/broadcast/interface.go | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 2 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go | 2 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 9 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 9 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 9 | ||||
-rw-r--r-- | plugins/kv/interface.go | 36 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 12 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 3 | ||||
-rw-r--r-- | plugins/memory/kv.go | 2 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 5 | ||||
-rw-r--r-- | plugins/memory/pubsub.go | 2 | ||||
-rw-r--r-- | plugins/redis/channel.go | 2 | ||||
-rw-r--r-- | plugins/redis/kv.go | 2 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 4 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 2 | ||||
-rw-r--r-- | plugins/server/plugin.go | 27 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 2 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 2 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 2 |
23 files changed, 60 insertions, 82 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 46709d71..eda3572f 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,6 +1,6 @@ package broadcast -import "github.com/spiral/roadrunner/v2/pkg/pubsub" +import "github.com/spiral/roadrunner/v2/common/pubsub" type Broadcaster interface { GetDriver(key string) (pubsub.SubReader, error) diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 6ddef806..889dc2fa 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 2ee211f8..475076a0 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,7 +2,7 @@ package broadcast import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index a1c1532c..0639f448 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-22T07:34:31.801Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="W3ry_giS-Ii3LUJqzQzg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfFr7mIdvTKRLwd1Es266nhqh+DFhIY7mED+geInwBYmtnd9oBa4y0WX3pMi8rVV9f1a/+Bw==</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-22T08:56:50.739Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="Kdvb2D1nWMjMRedS8I3V" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfaoPfkO3pFAn4uyiWbddTQ1Q/BiyksVzCB3QPEb4AsbWfZTrkASdtVl+6zMtK1ddX9av/AQ==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index cda2a711..bd5ff5bf 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" @@ -12,6 +13,8 @@ import ( ) const ( + // RrJobs env variable + RrJobs string = "rr_jobs" PluginName string = "jobs" ) @@ -24,6 +27,10 @@ type Plugin struct { consumers map[string]Consumer } +func testListener(data interface{}) { + fmt.Println(data) +} + func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { @@ -35,7 +42,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, nil, nil) + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 4b675271..e5aac290 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,8 +9,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 6ae1a1f6..c839130f 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -2,12 +2,15 @@ package boltdb import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "boltdb" +const ( + PluginName string = "boltdb" + RootPluginName string = "kv" +) // Plugin BoltDB K/V storage. type Plugin struct { @@ -21,7 +24,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index a2787d72..520ec7d5 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,8 +6,8 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 22ea5cca..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -2,12 +2,15 @@ package memcached import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "memcached" +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) type Plugin struct { // config plugin @@ -17,7 +20,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go deleted file mode 100644 index ffdbbe62..00000000 --- a/plugins/kv/interface.go +++ /dev/null @@ -1,36 +0,0 @@ -package kv - -import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" - -// Storage represents single abstract storage. -type Storage interface { - // Has checks if value exists. - Has(keys ...string) (map[string]bool, error) - - // Get loads value content into a byte slice. - Get(key string) ([]byte, error) - - // MGet loads content of multiple values - // Returns the map with existing keys and associated values - MGet(keys ...string) (map[string][]byte, error) - - // Set used to upload item to KV with TTL - // 0 value in TTL means no TTL - Set(items ...*kvv1.Item) error - - // MExpire sets the TTL for multiply keys - MExpire(items ...*kvv1.Item) error - - // TTL return the rest time to live for provided keys - // Not supported for the memcached and boltdb - TTL(keys ...string) (map[string]string, error) - - // Delete one or multiple keys. - Delete(keys ...string) error -} - -// Constructor provides storage based on the config -type Constructor interface { - // KVConstruct provides Storage based on the config key - KVConstruct(key string) (Storage, error) -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 03dbaed6..e9ea25df 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -5,10 +5,12 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) +// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. const PluginName string = "kv" const ( @@ -25,9 +27,9 @@ const ( type Plugin struct { log logger.Logger // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. - constructors map[string]Constructor + constructors map[string]kv.Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]Storage + storages map[string]kv.Storage // KV configuration cfg Config cfgPlugin config.Configurer @@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.constructors = make(map[string]Constructor, 5) - p.storages = make(map[string]Storage, 5) + p.constructors = make(map[string]kv.Constructor, 5) + p.storages = make(map[string]kv.Storage, 5) p.log = log p.cfgPlugin = cfg return nil @@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) { // save the storage constructor p.constructors[name.Name()] = constructor } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index af763600..b9b302fe 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,6 +2,7 @@ package kv import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) @@ -9,7 +10,7 @@ import ( // Wrapper for the plugin type rpc struct { // all available storages - storages map[string]Storage + storages map[string]kv.Storage // svc is a plugin implementing Storage interface srv *Plugin // Logger diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index 1cf031d1..1906e4fd 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,8 +6,8 @@ import ( "time" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 70badf15..7d418a70 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,9 +2,9 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -21,7 +21,6 @@ type Plugin struct { func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.log = log p.cfgPlugin = cfg p.stop = make(chan struct{}, 1) return nil diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index d027a8a5..3c909900 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -3,8 +3,8 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go index 5817853c..0cd62d19 100644 --- a/plugins/redis/channel.go +++ b/plugins/redis/channel.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 320b7443..378d7630 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,8 +7,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 9d98790b..3c62a63f 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,9 +5,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 4e41acb5..8bd78514 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 00639f43..e2fa0086 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) + list = append(list, server.collectWorkerEvents) spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -147,8 +147,8 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En return nil, errors.E(op, err) } - list := make([]events.Listener, 0, 1) - list = append(list, server.collectEvents) + list := make([]events.Listener, 0, 22) + list = append(list, server.collectPoolEvents, server.collectWorkerEvents) if len(listeners) != 0 { list = append(list, listeners...) } @@ -209,7 +209,7 @@ func (server *Plugin) setEnv(e Env) []string { return env } -func (server *Plugin) collectEvents(event interface{}) { +func (server *Plugin) collectPoolEvents(event interface{}) { if we, ok := event.(events.PoolEvent); ok { switch we.Event { case events.EventMaxMemory: @@ -238,7 +238,9 @@ func (server *Plugin) collectEvents(event interface{}) { server.log.Warn("requested pool restart") } } +} +func (server *Plugin) collectWorkerEvents(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: @@ -264,16 +266,13 @@ func (server *Plugin) collectEvents(event interface{}) { } } -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) - case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - // stderr event is INFO level - case events.EventWorkerStderr: - server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) +func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed) + case events.EventJobOK: + server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed) } } } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 664b4dfd..c1f79a78 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,7 +7,7 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ca5f2f59..c9a31613 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,10 +10,10 @@ import ( "github.com/google/uuid" json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 752ba3ce..758620f6 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,7 @@ import ( "sync" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" |