refactor(events): extract event system to own module and generalize subscriber handling

This commit is contained in:
Anthony Rubick
2025-08-24 18:46:03 -05:00
parent d74ecb2444
commit f45d5f46b4
8 changed files with 218 additions and 201 deletions

View File

@@ -10,7 +10,7 @@ import (
"github.com/microcosm-cc/bluemonday"
"imuslab.com/zoraxy/mod/access"
"imuslab.com/zoraxy/mod/plugins"
"imuslab.com/zoraxy/mod/eventsystem"
"imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
"imuslab.com/zoraxy/mod/utils"
)
@@ -100,7 +100,7 @@ func handleCreateAccessRule(w http.ResponseWriter, r *http.Request) {
}
// emit an event for the new access rule creation
plugins.EventSystem.Emit(
eventsystem.Publisher.Emit(
&zoraxy_plugin.AccessRuleCreatedEvent{
ID: ruleUUID,
Name: ruleName,
@@ -372,7 +372,7 @@ func handleBlacklistEnable(w http.ResponseWriter, r *http.Request) {
return
}
plugins.EventSystem.Emit(&zoraxy_plugin.BlacklistToggledEvent{
eventsystem.Publisher.Emit(&zoraxy_plugin.BlacklistToggledEvent{
RuleID: ruleID,
Enabled: rule.BlacklistEnabled,
})

View File

@@ -6,8 +6,8 @@ import (
"path/filepath"
"imuslab.com/zoraxy/mod/access"
"imuslab.com/zoraxy/mod/eventsystem"
"imuslab.com/zoraxy/mod/netutils"
"imuslab.com/zoraxy/mod/plugins"
"imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
@@ -51,7 +51,7 @@ func accessRequestBlocked(accessRule *access.AccessRule, templateDirectory strin
if err != nil {
comment = "blacklisted"
}
plugins.EventSystem.Emit(
eventsystem.Publisher.Emit(
&zoraxy_plugin.BlacklistedIPBlockedEvent{
IP: clientIpAddr,
Comment: comment,

View File

@@ -0,0 +1,124 @@
package eventsystem
import (
"sync"
"time"
"imuslab.com/zoraxy/mod/info/logger"
// "imuslab.com/zoraxy/mod/plugins"
zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
type ListenerID string
type Listener interface {
Notify(event zoraxyPlugin.Event) error
GetID() ListenerID
}
// eventManager manages event subscriptions and dispatching events to listeners
type eventManager struct {
subscriptions map[zoraxyPlugin.EventName][]ListenerID // EventType -> []Subscriber, tracks which events each listener is subscribed to
subscribers map[ListenerID]Listener // ListenerID -> Listener, tracks all registered listeners
logger *logger.Logger // Logger for the event manager
mutex sync.RWMutex // Mutex for concurrent access
}
var (
// Publisher is the singleton instance of the event manager
Publisher *eventManager
once sync.Once
)
// InitEventSystem initializes the event manager with the plugin manager
func InitEventSystem(logger *logger.Logger) {
once.Do(func() {
Publisher = &eventManager{
subscriptions: make(map[zoraxyPlugin.EventName][]ListenerID),
subscribers: make(map[ListenerID]Listener),
logger: logger,
}
})
}
// RegisterSubscriberToEvent adds a listener to the subscription list for an event type
func (em *eventManager) RegisterSubscriberToEvent(subscriber Listener, eventType zoraxyPlugin.EventName) error {
em.mutex.Lock()
defer em.mutex.Unlock()
if _, exists := em.subscriptions[eventType]; !exists {
em.subscriptions[eventType] = []ListenerID{}
}
// Register the listener if not already registered
listenerID := subscriber.GetID()
em.subscribers[listenerID] = subscriber
// Check if already subscribed to the event
for _, id := range em.subscriptions[eventType] {
if id == listenerID {
return nil // Already subscribed
}
}
// Register the listener to the event
em.subscriptions[eventType] = append(em.subscriptions[eventType], listenerID)
return nil
}
// Deregister removes a listener from all event subscriptions, and
// also removes the listener from the list of subscribers.
func (em *eventManager) UnregisterSubscriber(listenerID ListenerID) error {
em.mutex.Lock()
defer em.mutex.Unlock()
for eventType, subscribers := range em.subscriptions {
for i, id := range subscribers {
if id == listenerID {
em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...)
break
}
}
}
delete(em.subscribers, listenerID)
return nil
}
// Emit dispatches an event to all subscribed listeners
func (em *eventManager) Emit(payload zoraxyPlugin.EventPayload) error {
eventName := payload.GetName()
em.mutex.RLock()
defer em.mutex.RUnlock()
subscribers, exists := em.subscriptions[eventName]
if !exists || len(subscribers) == 0 {
return nil // No subscribers
}
// Create the event
event := zoraxyPlugin.Event{
Name: eventName,
Timestamp: time.Now().Unix(),
Data: payload,
}
// Dispatch to all subscribers asynchronously
for _, listenerID := range subscribers {
listener, exists := em.subscribers[listenerID]
if !exists {
em.logger.PrintAndLog("event-system", "Failed to get listener for event dispatch, removing "+string(listenerID)+" from subscriptions", nil)
continue
}
go func(l Listener) {
if err := l.Notify(event); err != nil {
em.logger.PrintAndLog("event-system", "Failed to dispatch `"+string(event.Name)+"` event to listener "+string(listenerID), err)
}
}(listener)
}
return nil
}

View File

@@ -1,4 +1,4 @@
package plugins
package eventsystem
import (
"encoding/json"

View File

@@ -1,192 +0,0 @@
package plugins
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"imuslab.com/zoraxy/mod/info/logger"
zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
// eventManager manages event subscriptions and dispatching
type eventManager struct {
subscriptions map[zoraxyPlugin.EventName][]string // EventType -> []PluginID
pluginLookup func(pluginID string) (*Plugin, error) // Function to get plugin by ID
logger *logger.Logger // Logger for the event manager
mutex sync.RWMutex // Mutex for concurrent access
}
var (
// EventSystem is the singleton instance of the event manager
EventSystem *eventManager
once sync.Once
)
// InitEventManager initializes the event manager with the plugin manager
func InitEventManager(pluginLookup func(pluginID string) (*Plugin, error), logger *logger.Logger) {
once.Do(func() {
EventSystem = &eventManager{
subscriptions: make(map[zoraxyPlugin.EventName][]string),
pluginLookup: pluginLookup,
logger: logger,
}
})
}
// Subscribe adds a plugin to the subscription list for an event type
func (em *eventManager) Subscribe(pluginID string, eventType zoraxyPlugin.EventName) error {
em.mutex.Lock()
defer em.mutex.Unlock()
if _, exists := em.subscriptions[eventType]; !exists {
em.subscriptions[eventType] = []string{}
}
// Check if already subscribed
for _, id := range em.subscriptions[eventType] {
if id == pluginID {
return nil // Already subscribed
}
}
em.subscriptions[eventType] = append(em.subscriptions[eventType], pluginID)
return nil
}
// Unsubscribe removes a plugin from the subscription list for an event type
func (em *eventManager) Unsubscribe(pluginID string, eventType zoraxyPlugin.EventName) error {
em.mutex.Lock()
defer em.mutex.Unlock()
if subscribers, exists := em.subscriptions[eventType]; exists {
for i, id := range subscribers {
if id == pluginID {
// Remove from slice
em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...)
break
}
}
}
return nil
}
// UnsubscribeAll removes a plugin from all event subscriptions
func (em *eventManager) UnsubscribeAll(pluginID string) error {
em.mutex.Lock()
defer em.mutex.Unlock()
for eventType, subscribers := range em.subscriptions {
for i, id := range subscribers {
if id == pluginID {
em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...)
break
}
}
}
return nil
}
// Emit dispatches an event to all subscribed plugins
func (em *eventManager) Emit(payload zoraxyPlugin.EventPayload) error {
eventName := payload.GetName()
em.mutex.RLock()
subscribers, exists := em.subscriptions[eventName]
em.mutex.RUnlock()
if !exists || len(subscribers) == 0 {
return nil // No subscribers
}
// Create the event
event := zoraxyPlugin.Event{
Name: eventName,
Timestamp: time.Now().Unix(),
Data: payload,
}
// Dispatch to all subscribers asynchronously
for _, pluginID := range subscribers {
go em.dispatchToPlugin(pluginID, event)
}
return nil
}
// dispatchToPlugin sends an event to a specific plugin
func (em *eventManager) dispatchToPlugin(pluginID string, event zoraxyPlugin.Event) {
plugin, err := em.pluginLookup(pluginID)
if err != nil {
em.logger.PrintAndLog("event-system", "Failed to get plugin for event dispatch: "+pluginID, err)
return
}
if !plugin.Enabled || plugin.AssignedPort == 0 {
// Plugin is not running, skip
return
}
subscriptionPath := plugin.Spec.SubscriptionPath
if subscriptionPath == "" {
// No subscription path configured, skip
return
}
if !strings.HasPrefix(subscriptionPath, "/") {
subscriptionPath = "/" + subscriptionPath
}
// Prepare the URL
url := fmt.Sprintf("http://127.0.0.1:%d%s/%s", plugin.AssignedPort, subscriptionPath, event.Name)
// Marshal the event to JSON
eventData, err := json.Marshal(event)
if err != nil {
em.logger.PrintAndLog("event-system", "Failed to marshal event for plugin "+pluginID, err)
return
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(eventData))
if err != nil {
em.logger.PrintAndLog("event-system", "Failed to create HTTP request for plugin "+pluginID, err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Zoraxy-Event-Type", string(event.Name))
// Send the request with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
em.logger.PrintAndLog("event-system", "Failed to send event to plugin "+pluginID, err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody := fmt.Errorf("no response body")
if resp.ContentLength > 0 {
buffer := bytes.NewBuffer(make([]byte, 0, resp.ContentLength))
_, respErr := buffer.ReadFrom(resp.Body)
if respErr != nil {
respBody = fmt.Errorf("failed to read response body: %v", respErr)
} else {
respBody = fmt.Errorf("response body: %s", buffer.String())
}
}
em.logger.PrintAndLog("event-system", fmt.Sprintf("Plugin %s returned non-200 status for event `%s`: %s", pluginID, event.Name, resp.Status), respBody)
}
}

View File

@@ -0,0 +1,83 @@
// implements `eventsystem.Listener` for Plugin
package plugins
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"imuslab.com/zoraxy/mod/eventsystem"
zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
func (p *Plugin) GetID() eventsystem.ListenerID {
return eventsystem.ListenerID(p.Spec.ID)
}
// Send an event to the plugin
func (p *Plugin) Notify(event zoraxyPlugin.Event) error {
// Handle the event notification
if !p.Enabled || p.AssignedPort == 0 {
return fmt.Errorf("plugin %s is not running", p.Spec.ID)
}
subscriptionPath := p.Spec.SubscriptionPath
if subscriptionPath == "" {
return fmt.Errorf("plugin %s has no subscription path configured", p.Spec.ID)
}
if !strings.HasPrefix(subscriptionPath, "/") {
subscriptionPath = "/" + subscriptionPath
}
// Prepare the URL
url := fmt.Sprintf("http://127.0.0.1:%d%s/%s", p.AssignedPort, subscriptionPath, event.Name)
// Marshal the event to JSON
eventData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(eventData))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Zoraxy-Event-Type", string(event.Name))
// Send the request with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return errors.New("Failed to send event: " + err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody := fmt.Errorf("no response body")
if resp.ContentLength > 0 {
buffer := bytes.NewBuffer(make([]byte, 0, resp.ContentLength))
_, respErr := buffer.ReadFrom(resp.Body)
if respErr != nil {
respBody = fmt.Errorf("failed to read response body: %v", respErr)
} else {
respBody = fmt.Errorf("response body: %s", buffer.String())
}
}
return fmt.Errorf("plugin %s returned non-200 status for event `%s` (%s): %w", p.Spec.ID, event.Name, resp.Status, respBody)
}
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"time"
"imuslab.com/zoraxy/mod/dynamicproxy/dpcore"
"imuslab.com/zoraxy/mod/eventsystem"
zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
@@ -155,7 +156,7 @@ func (m *Manager) StartPlugin(pluginID string) error {
if thisPlugin.Spec.SubscriptionsEvents != nil {
for eventName := range thisPlugin.Spec.SubscriptionsEvents {
eventType := zoraxyPlugin.EventName(eventName)
err := EventSystem.Subscribe(thisPlugin.Spec.ID, eventType)
err := eventsystem.Publisher.RegisterSubscriberToEvent(thisPlugin, eventType)
if err != nil {
m.Log("Failed to subscribe plugin "+thisPlugin.Spec.Name+" to event "+string(eventName), err)
} else {
@@ -302,7 +303,7 @@ func (m *Manager) StopPlugin(pluginID string) error {
m.Log("Failed to revoke API keys for plugin "+thisPlugin.Spec.Name, err)
}
//Unsubscribe from all events
err = EventSystem.UnsubscribeAll(thisPlugin.Spec.ID)
err = eventsystem.Publisher.UnregisterSubscriber(eventsystem.ListenerID(thisPlugin.Spec.ID))
if err != nil {
m.Log("Failed to unsubscribe plugin "+thisPlugin.Spec.Name+" from events", err)
}

View File

@@ -11,6 +11,7 @@ import (
"time"
"imuslab.com/zoraxy/mod/auth/sso/oauth2"
"imuslab.com/zoraxy/mod/eventsystem"
"github.com/gorilla/csrf"
"imuslab.com/zoraxy/mod/access"
@@ -350,7 +351,7 @@ func startupSequence() {
/*
Event Manager
*/
plugins.InitEventManager(pluginManager.GetPluginByID, SystemWideLogger)
eventsystem.InitEventSystem(SystemWideLogger)
//Sync latest plugin list from the plugin store
go func() {