diff options
author | Valery Piashchynski <[email protected]> | 2021-06-22 15:20:08 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-22 15:20:08 +0300 |
commit | 5627146e45afbb8f6566862c60a42a0b0aad2d0a (patch) | |
tree | 731e4157c3c09dabab60bd2c78910facf23fce75 | |
parent | 1a2a1f4735e40675abf6cd9767c99374359ec2bb (diff) |
- Move common interfaces and structures to the 'common' folder
- Update tests
Signed-off-by: Valery Piashchynski <[email protected]>
37 files changed, 165 insertions, 56 deletions
diff --git a/common/doc.go b/common/doc.go new file mode 100644 index 00000000..adc03351 --- /dev/null +++ b/common/doc.go @@ -0,0 +1,9 @@ +/* +Package common used to collect common interfaces/structures which might be implemented (or imported) by a different plugins. +For example, 'pubsub' interface might be implemented by memory, redis, websockets and many other plugins. + +Folders: +- kv - contains KV interfaces and structures +- pubsub - contains pub-sub interfaces and structures +*/ +package common diff --git a/plugins/kv/interface.go b/common/kv/interface.go index ffdbbe62..ffdbbe62 100644 --- a/plugins/kv/interface.go +++ b/common/kv/interface.go diff --git a/pkg/pubsub/interface.go b/common/pubsub/interface.go index 06252d70..06252d70 100644 --- a/pkg/pubsub/interface.go +++ b/common/pubsub/interface.go diff --git a/pkg/pubsub/psmessage.go b/common/pubsub/psmessage.go index e33d9284..e33d9284 100644 --- a/pkg/pubsub/psmessage.go +++ b/common/pubsub/psmessage.go diff --git a/pkg/events/general.go b/pkg/events/general.go index a09a8759..5cf13e10 100755 --- a/pkg/events/general.go +++ b/pkg/events/general.go @@ -4,6 +4,8 @@ import ( "sync" ) +const UnknownEventType string = "Unknown event type" + // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { listeners []Listener diff --git a/pkg/events/interface.go b/pkg/events/interface.go index ac6c15a4..7d57e4d0 100644 --- a/pkg/events/interface.go +++ b/pkg/events/interface.go @@ -2,7 +2,7 @@ package events // Handler interface type Handler interface { - // Return number of active listeners + // NumListeners return number of active listeners NumListeners() int // AddListener adds lister to the publisher AddListener(listener Listener) @@ -10,5 +10,5 @@ type Handler interface { Push(e interface{}) } -// Event listener listens for the events produced by worker, worker pool or other service. +// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. type Listener func(event interface{}) diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go new file mode 100644 index 00000000..ed07c7da --- /dev/null +++ b/pkg/events/jobs_events.go @@ -0,0 +1,84 @@ +package events + +import ( + "time" +) + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK = iota + 12000 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeConsume when pipeline pipelines has been requested. + EventPipeConsume + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStop when pipeline has begun stopping. + EventPipeStop + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventBrokerReady thrown when broken is ready to accept/serve tasks. + EventBrokerReady +) + +type J int64 + +func (ev J) String() string { + switch ev { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeConsume: + return "EventPipeConsume" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStop: + return "EventPipeStop" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventBrokerReady: + return "EventBrokerReady" + } + return UnknownEventType +} + +// JobEvent represent job event. +type JobEvent struct { + Event J + // String is job id. + ID string + + // Job is failed job. + Job interface{} // this is *jobs.Job, but interface used to avoid package import + + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index e7b451e0..4d4cae5d 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -57,7 +57,7 @@ func (ev P) String() string { case EventPoolRestart: return "EventPoolRestart" } - return "Unknown event type" + return UnknownEventType } // PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 11bd6ab7..39c38e57 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -20,7 +20,7 @@ func (ev W) String() string { case EventWorkerStderr: return "EventWorkerStderr" } - return "Unknown event type" + return UnknownEventType } // WorkerEvent wraps worker events. diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 46709d71..eda3572f 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,6 +1,6 @@ package broadcast -import "github.com/spiral/roadrunner/v2/pkg/pubsub" +import "github.com/spiral/roadrunner/v2/common/pubsub" type Broadcaster interface { GetDriver(key string) (pubsub.SubReader, error) diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 6ddef806..889dc2fa 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 2ee211f8..475076a0 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,7 +2,7 @@ package broadcast import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index a1c1532c..0639f448 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-22T07:34:31.801Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="W3ry_giS-Ii3LUJqzQzg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfFr7mIdvTKRLwd1Es266nhqh+DFhIY7mED+geInwBYmtnd9oBa4y0WX3pMi8rVV9f1a/+Bw==</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-22T08:56:50.739Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="Kdvb2D1nWMjMRedS8I3V" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfaoPfkO3pFAn4uyiWbddTQ1Q/BiyksVzCB3QPEb4AsbWfZTrkASdtVl+6zMtK1ddX9av/AQ==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index cda2a711..bd5ff5bf 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" @@ -12,6 +13,8 @@ import ( ) const ( + // RrJobs env variable + RrJobs string = "rr_jobs" PluginName string = "jobs" ) @@ -24,6 +27,10 @@ type Plugin struct { consumers map[string]Consumer } +func testListener(data interface{}) { + fmt.Println(data) +} + func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { @@ -35,7 +42,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, nil, nil) + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 4b675271..e5aac290 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,8 +9,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 6ae1a1f6..c839130f 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -2,12 +2,15 @@ package boltdb import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "boltdb" +const ( + PluginName string = "boltdb" + RootPluginName string = "kv" +) // Plugin BoltDB K/V storage. type Plugin struct { @@ -21,7 +24,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index a2787d72..520ec7d5 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,8 +6,8 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 22ea5cca..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -2,12 +2,15 @@ package memcached import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "memcached" +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) type Plugin struct { // config plugin @@ -17,7 +20,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 03dbaed6..e9ea25df 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -5,10 +5,12 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) +// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. const PluginName string = "kv" const ( @@ -25,9 +27,9 @@ const ( type Plugin struct { log logger.Logger // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. - constructors map[string]Constructor + constructors map[string]kv.Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]Storage + storages map[string]kv.Storage // KV configuration cfg Config cfgPlugin config.Configurer @@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.constructors = make(map[string]Constructor, 5) - p.storages = make(map[string]Storage, 5) + p.constructors = make(map[string]kv.Constructor, 5) + p.storages = make(map[string]kv.Storage, 5) p.log = log p.cfgPlugin = cfg return nil @@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) { // save the storage constructor p.constructors[name.Name()] = constructor } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index af763600..b9b302fe 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,6 +2,7 @@ package kv import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) @@ -9,7 +10,7 @@ import ( // Wrapper for the plugin type rpc struct { // all available storages - storages map[string]Storage + storages map[string]kv.Storage // svc is a plugin implementing Storage interface srv *Plugin // Logger diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index 1cf031d1..1906e4fd 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,8 +6,8 @@ import ( "time" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 70badf15..7d418a70 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,9 +2,9 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -21,7 +21,6 @@ type Plugin struct { func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.log = log p.cfgPlugin = cfg p.stop = make(chan struct{}, 1) return nil diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index d027a8a5..3c909900 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -3,8 +3,8 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go index 5817853c..0cd62d19 100644 --- a/plugins/redis/channel.go +++ b/plugins/redis/channel.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 320b7443..378d7630 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,8 +7,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 9d98790b..3c62a63f 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,9 +5,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 4e41acb5..8bd78514 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 00639f43..e2fa0086 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) + list = append(list, server.collectWorkerEvents) spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -147,8 +147,8 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En return nil, errors.E(op, err) } - list := make([]events.Listener, 0, 1) - list = append(list, server.collectEvents) + list := make([]events.Listener, 0, 22) + list = append(list, server.collectPoolEvents, server.collectWorkerEvents) if len(listeners) != 0 { list = append(list, listeners...) } @@ -209,7 +209,7 @@ func (server *Plugin) setEnv(e Env) []string { return env } -func (server *Plugin) collectEvents(event interface{}) { +func (server *Plugin) collectPoolEvents(event interface{}) { if we, ok := event.(events.PoolEvent); ok { switch we.Event { case events.EventMaxMemory: @@ -238,7 +238,9 @@ func (server *Plugin) collectEvents(event interface{}) { server.log.Warn("requested pool restart") } } +} +func (server *Plugin) collectWorkerEvents(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: @@ -264,16 +266,13 @@ func (server *Plugin) collectEvents(event interface{}) { } } -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) - case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - // stderr event is INFO level - case events.EventWorkerStderr: - server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) +func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed) + case events.EventJobOK: + server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed) } } } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 664b4dfd..c1f79a78 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,7 +7,7 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ca5f2f59..c9a31613 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,10 +10,10 @@ import ( "github.com/google/uuid" json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 752ba3ce..758620f6 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,7 @@ import ( "sync" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index d3b16256..390ba581 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index 2bd819d2..809020dc 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index ef926222..4507a5b9 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index c9b94777..6783855e 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index 01562a8f..fade6b66 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 76f2d6e8..d98a50b7 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) |