From f45d5f46b44a0019262e0d2f1813f8a9621e4c49 Mon Sep 17 00:00:00 2001 From: Anthony Rubick <68485672+AnthonyMichaelTDM@users.noreply.github.com> Date: Sun, 24 Aug 2025 18:46:03 -0500 Subject: [PATCH] refactor(events): extract event system to own module and generalize subscriber handling --- src/accesslist.go | 6 +- src/mod/dynamicproxy/access.go | 4 +- src/mod/eventsystem/event_system.go | 124 +++++++++++ .../event_system_test.go | 2 +- src/mod/plugins/event_system.go | 192 ------------------ src/mod/plugins/eventlistener.go | 83 ++++++++ src/mod/plugins/lifecycle.go | 5 +- src/start.go | 3 +- 8 files changed, 218 insertions(+), 201 deletions(-) create mode 100644 src/mod/eventsystem/event_system.go rename src/mod/{plugins => eventsystem}/event_system_test.go (99%) delete mode 100644 src/mod/plugins/event_system.go create mode 100644 src/mod/plugins/eventlistener.go diff --git a/src/accesslist.go b/src/accesslist.go index 0a6b07b..f65c7b2 100644 --- a/src/accesslist.go +++ b/src/accesslist.go @@ -10,7 +10,7 @@ import ( "github.com/microcosm-cc/bluemonday" "imuslab.com/zoraxy/mod/access" - "imuslab.com/zoraxy/mod/plugins" + "imuslab.com/zoraxy/mod/eventsystem" "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" "imuslab.com/zoraxy/mod/utils" ) @@ -100,7 +100,7 @@ func handleCreateAccessRule(w http.ResponseWriter, r *http.Request) { } // emit an event for the new access rule creation - plugins.EventSystem.Emit( + eventsystem.Publisher.Emit( &zoraxy_plugin.AccessRuleCreatedEvent{ ID: ruleUUID, Name: ruleName, @@ -372,7 +372,7 @@ func handleBlacklistEnable(w http.ResponseWriter, r *http.Request) { return } - plugins.EventSystem.Emit(&zoraxy_plugin.BlacklistToggledEvent{ + eventsystem.Publisher.Emit(&zoraxy_plugin.BlacklistToggledEvent{ RuleID: ruleID, Enabled: rule.BlacklistEnabled, }) diff --git a/src/mod/dynamicproxy/access.go b/src/mod/dynamicproxy/access.go index 4ddfac0..4cb5f25 100644 --- a/src/mod/dynamicproxy/access.go +++ b/src/mod/dynamicproxy/access.go @@ -6,8 +6,8 @@ import ( "path/filepath" "imuslab.com/zoraxy/mod/access" + "imuslab.com/zoraxy/mod/eventsystem" "imuslab.com/zoraxy/mod/netutils" - "imuslab.com/zoraxy/mod/plugins" "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" ) @@ -51,7 +51,7 @@ func accessRequestBlocked(accessRule *access.AccessRule, templateDirectory strin if err != nil { comment = "blacklisted" } - plugins.EventSystem.Emit( + eventsystem.Publisher.Emit( &zoraxy_plugin.BlacklistedIPBlockedEvent{ IP: clientIpAddr, Comment: comment, diff --git a/src/mod/eventsystem/event_system.go b/src/mod/eventsystem/event_system.go new file mode 100644 index 0000000..d2453b1 --- /dev/null +++ b/src/mod/eventsystem/event_system.go @@ -0,0 +1,124 @@ +package eventsystem + +import ( + "sync" + "time" + + "imuslab.com/zoraxy/mod/info/logger" + // "imuslab.com/zoraxy/mod/plugins" + zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" +) + +type ListenerID string +type Listener interface { + Notify(event zoraxyPlugin.Event) error + GetID() ListenerID +} + +// eventManager manages event subscriptions and dispatching events to listeners +type eventManager struct { + subscriptions map[zoraxyPlugin.EventName][]ListenerID // EventType -> []Subscriber, tracks which events each listener is subscribed to + subscribers map[ListenerID]Listener // ListenerID -> Listener, tracks all registered listeners + logger *logger.Logger // Logger for the event manager + mutex sync.RWMutex // Mutex for concurrent access +} + +var ( + // Publisher is the singleton instance of the event manager + Publisher *eventManager + once sync.Once +) + +// InitEventSystem initializes the event manager with the plugin manager +func InitEventSystem(logger *logger.Logger) { + once.Do(func() { + Publisher = &eventManager{ + subscriptions: make(map[zoraxyPlugin.EventName][]ListenerID), + subscribers: make(map[ListenerID]Listener), + logger: logger, + } + }) +} + +// RegisterSubscriberToEvent adds a listener to the subscription list for an event type +func (em *eventManager) RegisterSubscriberToEvent(subscriber Listener, eventType zoraxyPlugin.EventName) error { + em.mutex.Lock() + defer em.mutex.Unlock() + + if _, exists := em.subscriptions[eventType]; !exists { + em.subscriptions[eventType] = []ListenerID{} + } + + // Register the listener if not already registered + listenerID := subscriber.GetID() + em.subscribers[listenerID] = subscriber + + // Check if already subscribed to the event + for _, id := range em.subscriptions[eventType] { + if id == listenerID { + return nil // Already subscribed + } + } + + // Register the listener to the event + em.subscriptions[eventType] = append(em.subscriptions[eventType], listenerID) + + return nil +} + +// Deregister removes a listener from all event subscriptions, and +// also removes the listener from the list of subscribers. +func (em *eventManager) UnregisterSubscriber(listenerID ListenerID) error { + em.mutex.Lock() + defer em.mutex.Unlock() + + for eventType, subscribers := range em.subscriptions { + for i, id := range subscribers { + if id == listenerID { + em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...) + break + } + } + } + delete(em.subscribers, listenerID) + + return nil +} + +// Emit dispatches an event to all subscribed listeners +func (em *eventManager) Emit(payload zoraxyPlugin.EventPayload) error { + eventName := payload.GetName() + + em.mutex.RLock() + defer em.mutex.RUnlock() + subscribers, exists := em.subscriptions[eventName] + + if !exists || len(subscribers) == 0 { + return nil // No subscribers + } + + // Create the event + event := zoraxyPlugin.Event{ + Name: eventName, + Timestamp: time.Now().Unix(), + Data: payload, + } + + // Dispatch to all subscribers asynchronously + for _, listenerID := range subscribers { + listener, exists := em.subscribers[listenerID] + + if !exists { + em.logger.PrintAndLog("event-system", "Failed to get listener for event dispatch, removing "+string(listenerID)+" from subscriptions", nil) + continue + } + + go func(l Listener) { + if err := l.Notify(event); err != nil { + em.logger.PrintAndLog("event-system", "Failed to dispatch `"+string(event.Name)+"` event to listener "+string(listenerID), err) + } + }(listener) + } + + return nil +} diff --git a/src/mod/plugins/event_system_test.go b/src/mod/eventsystem/event_system_test.go similarity index 99% rename from src/mod/plugins/event_system_test.go rename to src/mod/eventsystem/event_system_test.go index 614b195..c6ecebc 100644 --- a/src/mod/plugins/event_system_test.go +++ b/src/mod/eventsystem/event_system_test.go @@ -1,4 +1,4 @@ -package plugins +package eventsystem import ( "encoding/json" diff --git a/src/mod/plugins/event_system.go b/src/mod/plugins/event_system.go deleted file mode 100644 index de7a8d3..0000000 --- a/src/mod/plugins/event_system.go +++ /dev/null @@ -1,192 +0,0 @@ -package plugins - -import ( - "bytes" - "encoding/json" - "fmt" - "net/http" - "strings" - "sync" - "time" - - "imuslab.com/zoraxy/mod/info/logger" - zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" -) - -// eventManager manages event subscriptions and dispatching -type eventManager struct { - subscriptions map[zoraxyPlugin.EventName][]string // EventType -> []PluginID - pluginLookup func(pluginID string) (*Plugin, error) // Function to get plugin by ID - logger *logger.Logger // Logger for the event manager - mutex sync.RWMutex // Mutex for concurrent access -} - -var ( - // EventSystem is the singleton instance of the event manager - EventSystem *eventManager - once sync.Once -) - -// InitEventManager initializes the event manager with the plugin manager -func InitEventManager(pluginLookup func(pluginID string) (*Plugin, error), logger *logger.Logger) { - once.Do(func() { - EventSystem = &eventManager{ - subscriptions: make(map[zoraxyPlugin.EventName][]string), - pluginLookup: pluginLookup, - logger: logger, - } - }) -} - -// Subscribe adds a plugin to the subscription list for an event type -func (em *eventManager) Subscribe(pluginID string, eventType zoraxyPlugin.EventName) error { - em.mutex.Lock() - defer em.mutex.Unlock() - - if _, exists := em.subscriptions[eventType]; !exists { - em.subscriptions[eventType] = []string{} - } - - // Check if already subscribed - for _, id := range em.subscriptions[eventType] { - if id == pluginID { - return nil // Already subscribed - } - } - - em.subscriptions[eventType] = append(em.subscriptions[eventType], pluginID) - return nil -} - -// Unsubscribe removes a plugin from the subscription list for an event type - -func (em *eventManager) Unsubscribe(pluginID string, eventType zoraxyPlugin.EventName) error { - em.mutex.Lock() - defer em.mutex.Unlock() - - if subscribers, exists := em.subscriptions[eventType]; exists { - for i, id := range subscribers { - if id == pluginID { - // Remove from slice - em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...) - break - } - } - } - - return nil -} - -// UnsubscribeAll removes a plugin from all event subscriptions -func (em *eventManager) UnsubscribeAll(pluginID string) error { - em.mutex.Lock() - defer em.mutex.Unlock() - - for eventType, subscribers := range em.subscriptions { - for i, id := range subscribers { - if id == pluginID { - em.subscriptions[eventType] = append(subscribers[:i], subscribers[i+1:]...) - break - } - } - } - - return nil -} - -// Emit dispatches an event to all subscribed plugins -func (em *eventManager) Emit(payload zoraxyPlugin.EventPayload) error { - eventName := payload.GetName() - - em.mutex.RLock() - subscribers, exists := em.subscriptions[eventName] - em.mutex.RUnlock() - - if !exists || len(subscribers) == 0 { - return nil // No subscribers - } - - // Create the event - event := zoraxyPlugin.Event{ - Name: eventName, - Timestamp: time.Now().Unix(), - Data: payload, - } - - // Dispatch to all subscribers asynchronously - for _, pluginID := range subscribers { - go em.dispatchToPlugin(pluginID, event) - } - - return nil -} - -// dispatchToPlugin sends an event to a specific plugin -func (em *eventManager) dispatchToPlugin(pluginID string, event zoraxyPlugin.Event) { - plugin, err := em.pluginLookup(pluginID) - if err != nil { - em.logger.PrintAndLog("event-system", "Failed to get plugin for event dispatch: "+pluginID, err) - return - } - - if !plugin.Enabled || plugin.AssignedPort == 0 { - // Plugin is not running, skip - return - } - - subscriptionPath := plugin.Spec.SubscriptionPath - if subscriptionPath == "" { - // No subscription path configured, skip - return - } - if !strings.HasPrefix(subscriptionPath, "/") { - subscriptionPath = "/" + subscriptionPath - } - - // Prepare the URL - url := fmt.Sprintf("http://127.0.0.1:%d%s/%s", plugin.AssignedPort, subscriptionPath, event.Name) - - // Marshal the event to JSON - eventData, err := json.Marshal(event) - if err != nil { - em.logger.PrintAndLog("event-system", "Failed to marshal event for plugin "+pluginID, err) - return - } - - // Create HTTP request - req, err := http.NewRequest("POST", url, bytes.NewBuffer(eventData)) - if err != nil { - em.logger.PrintAndLog("event-system", "Failed to create HTTP request for plugin "+pluginID, err) - return - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Zoraxy-Event-Type", string(event.Name)) - - // Send the request with timeout - client := &http.Client{ - Timeout: 10 * time.Second, - } - - resp, err := client.Do(req) - if err != nil { - em.logger.PrintAndLog("event-system", "Failed to send event to plugin "+pluginID, err) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - respBody := fmt.Errorf("no response body") - if resp.ContentLength > 0 { - buffer := bytes.NewBuffer(make([]byte, 0, resp.ContentLength)) - _, respErr := buffer.ReadFrom(resp.Body) - if respErr != nil { - respBody = fmt.Errorf("failed to read response body: %v", respErr) - } else { - respBody = fmt.Errorf("response body: %s", buffer.String()) - } - } - - em.logger.PrintAndLog("event-system", fmt.Sprintf("Plugin %s returned non-200 status for event `%s`: %s", pluginID, event.Name, resp.Status), respBody) - } -} diff --git a/src/mod/plugins/eventlistener.go b/src/mod/plugins/eventlistener.go new file mode 100644 index 0000000..6000c35 --- /dev/null +++ b/src/mod/plugins/eventlistener.go @@ -0,0 +1,83 @@ +// implements `eventsystem.Listener` for Plugin + +package plugins + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "imuslab.com/zoraxy/mod/eventsystem" + zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" +) + +func (p *Plugin) GetID() eventsystem.ListenerID { + return eventsystem.ListenerID(p.Spec.ID) +} + +// Send an event to the plugin +func (p *Plugin) Notify(event zoraxyPlugin.Event) error { + // Handle the event notification + if !p.Enabled || p.AssignedPort == 0 { + return fmt.Errorf("plugin %s is not running", p.Spec.ID) + } + + subscriptionPath := p.Spec.SubscriptionPath + if subscriptionPath == "" { + return fmt.Errorf("plugin %s has no subscription path configured", p.Spec.ID) + } + + if !strings.HasPrefix(subscriptionPath, "/") { + subscriptionPath = "/" + subscriptionPath + } + + // Prepare the URL + url := fmt.Sprintf("http://127.0.0.1:%d%s/%s", p.AssignedPort, subscriptionPath, event.Name) + + // Marshal the event to JSON + eventData, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + // Create HTTP request + req, err := http.NewRequest("POST", url, bytes.NewBuffer(eventData)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Zoraxy-Event-Type", string(event.Name)) + + // Send the request with timeout + client := &http.Client{ + Timeout: 10 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return errors.New("Failed to send event: " + err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody := fmt.Errorf("no response body") + if resp.ContentLength > 0 { + buffer := bytes.NewBuffer(make([]byte, 0, resp.ContentLength)) + _, respErr := buffer.ReadFrom(resp.Body) + if respErr != nil { + respBody = fmt.Errorf("failed to read response body: %v", respErr) + } else { + respBody = fmt.Errorf("response body: %s", buffer.String()) + } + } + + return fmt.Errorf("plugin %s returned non-200 status for event `%s` (%s): %w", p.Spec.ID, event.Name, resp.Status, respBody) + } + + return nil +} diff --git a/src/mod/plugins/lifecycle.go b/src/mod/plugins/lifecycle.go index 50a3c05..5860694 100644 --- a/src/mod/plugins/lifecycle.go +++ b/src/mod/plugins/lifecycle.go @@ -14,6 +14,7 @@ import ( "time" "imuslab.com/zoraxy/mod/dynamicproxy/dpcore" + "imuslab.com/zoraxy/mod/eventsystem" zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" ) @@ -155,7 +156,7 @@ func (m *Manager) StartPlugin(pluginID string) error { if thisPlugin.Spec.SubscriptionsEvents != nil { for eventName := range thisPlugin.Spec.SubscriptionsEvents { eventType := zoraxyPlugin.EventName(eventName) - err := EventSystem.Subscribe(thisPlugin.Spec.ID, eventType) + err := eventsystem.Publisher.RegisterSubscriberToEvent(thisPlugin, eventType) if err != nil { m.Log("Failed to subscribe plugin "+thisPlugin.Spec.Name+" to event "+string(eventName), err) } else { @@ -302,7 +303,7 @@ func (m *Manager) StopPlugin(pluginID string) error { m.Log("Failed to revoke API keys for plugin "+thisPlugin.Spec.Name, err) } //Unsubscribe from all events - err = EventSystem.UnsubscribeAll(thisPlugin.Spec.ID) + err = eventsystem.Publisher.UnregisterSubscriber(eventsystem.ListenerID(thisPlugin.Spec.ID)) if err != nil { m.Log("Failed to unsubscribe plugin "+thisPlugin.Spec.Name+" from events", err) } diff --git a/src/start.go b/src/start.go index 2c517f9..a6b84d8 100644 --- a/src/start.go +++ b/src/start.go @@ -11,6 +11,7 @@ import ( "time" "imuslab.com/zoraxy/mod/auth/sso/oauth2" + "imuslab.com/zoraxy/mod/eventsystem" "github.com/gorilla/csrf" "imuslab.com/zoraxy/mod/access" @@ -350,7 +351,7 @@ func startupSequence() { /* Event Manager */ - plugins.InitEventManager(pluginManager.GetPluginByID, SystemWideLogger) + eventsystem.InitEventSystem(SystemWideLogger) //Sync latest plugin list from the plugin store go func() {