From 0debd0b907ff90e8d107dad910206ccc23818d3f Mon Sep 17 00:00:00 2001 From: Anthony Rubick <68485672+AnthonyMichaelTDM@users.noreply.github.com> Date: Sun, 7 Sep 2025 19:04:17 -0500 Subject: [PATCH] perf(eventsystem): reduce duration locks are held also added a test to ensure there is not a deadlock if a listener is marked as subscribed to an event, but not registered --- src/mod/eventsystem/event_system.go | 17 ++++-- src/mod/eventsystem/event_system_test.go | 70 ++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/src/mod/eventsystem/event_system.go b/src/mod/eventsystem/event_system.go index 5ec9e27..211ea5e 100644 --- a/src/mod/eventsystem/event_system.go +++ b/src/mod/eventsystem/event_system.go @@ -93,7 +93,7 @@ func (em *eventManager) EmitToSubscribersAnd(listenerIDs []ListenerID, payload e eventName := payload.GetName() if len(listenerIDs) == 0 { - return // No subscribers + return // No listeners specified } // Create the event @@ -109,8 +109,8 @@ func (em *eventManager) EmitToSubscribersAnd(listenerIDs []ListenerID, payload e // Also emit to all subscribers of the event as usual em.mutex.RLock() + defer em.mutex.RUnlock() subscribers, exists := em.subscriptions[eventName] - em.mutex.RUnlock() if !exists || len(subscribers) == 0 { return // No subscribers } @@ -150,14 +150,14 @@ func (em *eventManager) emitTo(listenerIDs []ListenerID, event events.Event) { // Dispatch to all specified listeners asynchronously em.mutex.RLock() defer em.mutex.RUnlock() + listenersToUnregister := []ListenerID{} for _, listenerID := range listenerIDs { listener, exists := em.subscribers[listenerID] if !exists { em.logger.PrintAndLog("event-system", "Failed to get listener for event dispatch, removing "+string(listenerID)+" from subscriptions", nil) - // Remove the listener from the subscription list - // This is done in a separate goroutine to avoid deadlock - go em.UnregisterSubscriber(listenerID) + // Mark for removal + listenersToUnregister = append(listenersToUnregister, listenerID) continue } @@ -167,4 +167,11 @@ func (em *eventManager) emitTo(listenerIDs []ListenerID, event events.Event) { } }(listener) } + + // Unregister any listeners that no longer exist, asynchronously + go func() { + for _, id := range listenersToUnregister { + em.UnregisterSubscriber(id) + } + }() } diff --git a/src/mod/eventsystem/event_system_test.go b/src/mod/eventsystem/event_system_test.go index 8ae3798..d0afbb7 100644 --- a/src/mod/eventsystem/event_system_test.go +++ b/src/mod/eventsystem/event_system_test.go @@ -247,7 +247,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) { select { case <-listener1.receivedEvents: t.Fatal("Listener1 should not have received any events") - case <-time.After(500 * time.Millisecond): + case <-time.After(100 * time.Millisecond): // No event received, as expected } @@ -274,7 +274,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) { select { case <-listener2.receivedEvents: t.Fatal("Listener2 should not have received any new events") - case <-time.After(500 * time.Millisecond): + case <-time.After(100 * time.Millisecond): // No event received, as expected } @@ -283,7 +283,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) { "Hello from pluginA": false, "Hello from pluginB": false, } - for i := 0; i < 2; i++ { + for range expectedMessagesSeen { select { case receivedEvent := <-moderator.receivedEvents: if receivedEvent.Name != events.EventCustom { @@ -319,7 +319,69 @@ func TestEventEmissionToSpecificListener(t *testing.T) { select { case <-otherListener.receivedEvents: t.Fatal("otherListener should not have received any events") - case <-time.After(500 * time.Millisecond): + case <-time.After(100 * time.Millisecond): + // No event received, as expected + } +} + +func TestEmissionToNonExistentListener(t *testing.T) { + // Create a test listener and register it + listenerID := ListenerID("testListener") + testListener := &TestListener{ + id: listenerID, + receivedEvents: make(chan events.Event, 10), + } + + // Create event manager with the test listener marked as subscribed to BlacklistToggledEvents, + // but not actually registered + logger, err := logger.NewFmtLogger() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + em := eventManager{ + subscriptions: map[events.EventName][]ListenerID{ + events.EventBlacklistToggled: {listenerID}, + }, + subscribers: make(map[ListenerID]Listener), + logger: logger, + } + + // Emit a BlacklistToggled event + testEvent := &events.BlacklistToggledEvent{ + RuleID: "rule123", + Enabled: false, + } + eventEmitted := make(chan struct{}) + go func() { + em.Emit(testEvent) + time.Sleep(10 * time.Millisecond) // Give some time for the emission to process + close(eventEmitted) + }() + + // Wait for the event emission to complete + select { + case <-eventEmitted: + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for event emission to complete, likely due to deadlock") + } + + // check if the listener is still tracked by the event manager + em.mutex.RLock() + _, isRegistered := em.subscribers[listenerID] + subscribers := em.subscriptions[events.EventBlacklistToggled] + em.mutex.RUnlock() + if len(subscribers) != 0 { + t.Fatal("Listener should have been removed from subscriptions after failed dispatch") + } + if isRegistered { + t.Fatal("Listener was somehow registered") + } + + // Since the listener was unregistered, it should not receive any events + select { + case <-testListener.receivedEvents: + t.Fatal("Listener should not have received any events after being unregistered") + case <-time.After(100 * time.Millisecond): // No event received, as expected } }