perf(eventsystem): reduce duration locks are held

also added a test to ensure there is not a deadlock if a listener is marked as subscribed to an event, but not registered
This commit is contained in:
Anthony Rubick
2025-09-07 19:04:17 -05:00
parent 218c5aff40
commit 0debd0b907
2 changed files with 78 additions and 9 deletions

View File

@@ -93,7 +93,7 @@ func (em *eventManager) EmitToSubscribersAnd(listenerIDs []ListenerID, payload e
eventName := payload.GetName()
if len(listenerIDs) == 0 {
return // No subscribers
return // No listeners specified
}
// Create the event
@@ -109,8 +109,8 @@ func (em *eventManager) EmitToSubscribersAnd(listenerIDs []ListenerID, payload e
// Also emit to all subscribers of the event as usual
em.mutex.RLock()
defer em.mutex.RUnlock()
subscribers, exists := em.subscriptions[eventName]
em.mutex.RUnlock()
if !exists || len(subscribers) == 0 {
return // No subscribers
}
@@ -150,14 +150,14 @@ func (em *eventManager) emitTo(listenerIDs []ListenerID, event events.Event) {
// Dispatch to all specified listeners asynchronously
em.mutex.RLock()
defer em.mutex.RUnlock()
listenersToUnregister := []ListenerID{}
for _, listenerID := range listenerIDs {
listener, exists := em.subscribers[listenerID]
if !exists {
em.logger.PrintAndLog("event-system", "Failed to get listener for event dispatch, removing "+string(listenerID)+" from subscriptions", nil)
// Remove the listener from the subscription list
// This is done in a separate goroutine to avoid deadlock
go em.UnregisterSubscriber(listenerID)
// Mark for removal
listenersToUnregister = append(listenersToUnregister, listenerID)
continue
}
@@ -167,4 +167,11 @@ func (em *eventManager) emitTo(listenerIDs []ListenerID, event events.Event) {
}
}(listener)
}
// Unregister any listeners that no longer exist, asynchronously
go func() {
for _, id := range listenersToUnregister {
em.UnregisterSubscriber(id)
}
}()
}

View File

@@ -247,7 +247,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) {
select {
case <-listener1.receivedEvents:
t.Fatal("Listener1 should not have received any events")
case <-time.After(500 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
// No event received, as expected
}
@@ -274,7 +274,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) {
select {
case <-listener2.receivedEvents:
t.Fatal("Listener2 should not have received any new events")
case <-time.After(500 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
// No event received, as expected
}
@@ -283,7 +283,7 @@ func TestEventEmissionToSpecificListener(t *testing.T) {
"Hello from pluginA": false,
"Hello from pluginB": false,
}
for i := 0; i < 2; i++ {
for range expectedMessagesSeen {
select {
case receivedEvent := <-moderator.receivedEvents:
if receivedEvent.Name != events.EventCustom {
@@ -319,7 +319,69 @@ func TestEventEmissionToSpecificListener(t *testing.T) {
select {
case <-otherListener.receivedEvents:
t.Fatal("otherListener should not have received any events")
case <-time.After(500 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
// No event received, as expected
}
}
func TestEmissionToNonExistentListener(t *testing.T) {
// Create a test listener and register it
listenerID := ListenerID("testListener")
testListener := &TestListener{
id: listenerID,
receivedEvents: make(chan events.Event, 10),
}
// Create event manager with the test listener marked as subscribed to BlacklistToggledEvents,
// but not actually registered
logger, err := logger.NewFmtLogger()
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
em := eventManager{
subscriptions: map[events.EventName][]ListenerID{
events.EventBlacklistToggled: {listenerID},
},
subscribers: make(map[ListenerID]Listener),
logger: logger,
}
// Emit a BlacklistToggled event
testEvent := &events.BlacklistToggledEvent{
RuleID: "rule123",
Enabled: false,
}
eventEmitted := make(chan struct{})
go func() {
em.Emit(testEvent)
time.Sleep(10 * time.Millisecond) // Give some time for the emission to process
close(eventEmitted)
}()
// Wait for the event emission to complete
select {
case <-eventEmitted:
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for event emission to complete, likely due to deadlock")
}
// check if the listener is still tracked by the event manager
em.mutex.RLock()
_, isRegistered := em.subscribers[listenerID]
subscribers := em.subscriptions[events.EventBlacklistToggled]
em.mutex.RUnlock()
if len(subscribers) != 0 {
t.Fatal("Listener should have been removed from subscriptions after failed dispatch")
}
if isRegistered {
t.Fatal("Listener was somehow registered")
}
// Since the listener was unregistered, it should not receive any events
select {
case <-testListener.receivedEvents:
t.Fatal("Listener should not have received any events after being unregistered")
case <-time.After(100 * time.Millisecond):
// No event received, as expected
}
}