diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/events/general.go | 2 | ||||
-rw-r--r-- | pkg/events/interface.go | 4 | ||||
-rw-r--r-- | pkg/events/jobs_events.go | 84 | ||||
-rw-r--r-- | pkg/events/pool_events.go | 2 | ||||
-rw-r--r-- | pkg/events/worker_events.go | 2 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 54 | ||||
-rw-r--r-- | pkg/pubsub/psmessage.go | 15 |
7 files changed, 90 insertions, 73 deletions
diff --git a/pkg/events/general.go b/pkg/events/general.go index a09a8759..5cf13e10 100755 --- a/pkg/events/general.go +++ b/pkg/events/general.go @@ -4,6 +4,8 @@ import ( "sync" ) +const UnknownEventType string = "Unknown event type" + // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { listeners []Listener diff --git a/pkg/events/interface.go b/pkg/events/interface.go index ac6c15a4..7d57e4d0 100644 --- a/pkg/events/interface.go +++ b/pkg/events/interface.go @@ -2,7 +2,7 @@ package events // Handler interface type Handler interface { - // Return number of active listeners + // NumListeners return number of active listeners NumListeners() int // AddListener adds lister to the publisher AddListener(listener Listener) @@ -10,5 +10,5 @@ type Handler interface { Push(e interface{}) } -// Event listener listens for the events produced by worker, worker pool or other service. +// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. type Listener func(event interface{}) diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go new file mode 100644 index 00000000..ed07c7da --- /dev/null +++ b/pkg/events/jobs_events.go @@ -0,0 +1,84 @@ +package events + +import ( + "time" +) + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK = iota + 12000 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeConsume when pipeline pipelines has been requested. + EventPipeConsume + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStop when pipeline has begun stopping. + EventPipeStop + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventBrokerReady thrown when broken is ready to accept/serve tasks. + EventBrokerReady +) + +type J int64 + +func (ev J) String() string { + switch ev { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeConsume: + return "EventPipeConsume" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStop: + return "EventPipeStop" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventBrokerReady: + return "EventBrokerReady" + } + return UnknownEventType +} + +// JobEvent represent job event. +type JobEvent struct { + Event J + // String is job id. + ID string + + // Job is failed job. + Job interface{} // this is *jobs.Job, but interface used to avoid package import + + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index e7b451e0..4d4cae5d 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -57,7 +57,7 @@ func (ev P) String() string { case EventPoolRestart: return "EventPoolRestart" } - return "Unknown event type" + return UnknownEventType } // PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 11bd6ab7..39c38e57 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -20,7 +20,7 @@ func (ev W) String() string { case EventWorkerStderr: return "EventWorkerStderr" } - return "Unknown event type" + return UnknownEventType } // WorkerEvent wraps worker events. diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go deleted file mode 100644 index 06252d70..00000000 --- a/pkg/pubsub/interface.go +++ /dev/null @@ -1,54 +0,0 @@ -package pubsub - -/* -This interface is in BETA. It might be changed. -*/ - -// PubSub interface designed to implement on any storage type to provide pub-sub abilities -// Publisher used to receive messages from the PHP app via RPC -// Subscriber should be implemented to subscribe to a topics and provide a connections list per topic -// Reader return next message from the channel -type PubSub interface { - Publisher - Subscriber - Reader -} - -type SubReader interface { - Subscriber - Reader -} - -// Subscriber defines the ability to operate as message passing broker. -// BETA interface -type Subscriber interface { - // Subscribe broker to one or multiple topics. - Subscribe(connectionID string, topics ...string) error - - // Unsubscribe from one or multiply topics - Unsubscribe(connectionID string, topics ...string) error - - // Connections returns all connections associated with the particular topic - Connections(topic string, ret map[string]struct{}) -} - -// Publisher publish one or more messages -// BETA interface -type Publisher interface { - // Publish one or multiple Channel. - Publish(message *Message) error - - // PublishAsync publish message and return immediately - // If error occurred it will be printed into the logger - PublishAsync(message *Message) -} - -// Reader interface should return next message -type Reader interface { - Next() (*Message, error) -} - -// Constructor is a special pub-sub interface made to return a constructed PubSub type -type Constructor interface { - PSConstruct(key string) (PubSub, error) -} diff --git a/pkg/pubsub/psmessage.go b/pkg/pubsub/psmessage.go deleted file mode 100644 index e33d9284..00000000 --- a/pkg/pubsub/psmessage.go +++ /dev/null @@ -1,15 +0,0 @@ -package pubsub - -import json "github.com/json-iterator/go" - -// Message represents a single message with payload bound to a particular topic -type Message struct { - // Topic (channel in terms of redis) - Topic string `json:"topic"` - // Payload (on some decode stages might be represented as base64 string) - Payload []byte `json:"payload"` -} - -func (m *Message) MarshalBinary() (data []byte, err error) { - return json.Marshal(m) -} |