Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 129 additions & 28 deletions internal/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,52 @@ var securitySensitiveCommands = []string{
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb",
}

// eventSequencer allows for sequence-based event filtering for
// awaitMinPoolSizeMS support.
//
// Per the unified test format spec, when awaitMinPoolSizeMS is specified, any
// CMAP and SDAM events that occur during connection pool initialization
// (before minPoolSize is reached) must be ignored. We track this by
// assigning a monotonically increasing sequence number to each event as it's
// recorded. After pool initialization completes, we set eventCutoffSeq to the
// current sequence number. Event accessors for CMAP and SDAM types then
// filter out any events with sequence <= eventCutoffSeq.
//
// Sequencing is thread-safe to support concurrent operations that may generate
// events (e.g., connection checkouts generating CMAP events).
type eventSequencer struct {
counter atomic.Int64
cutoff atomic.Int64

mu sync.RWMutex

// pool events are heterogeneous, so we track their sequence separately
poolSeq []int64
seqByEventType map[monitoringEventType][]int64
}

// setCutoff marks the current sequence as the filtering cutoff point.
func (es *eventSequencer) setCutoff() {
es.cutoff.Store(es.counter.Load())
}

// recordEvent stores the sequence number for a given event type.
func (es *eventSequencer) recordEvent(eventType monitoringEventType) {
next := es.counter.Add(1)

es.mu.Lock()
es.seqByEventType[eventType] = append(es.seqByEventType[eventType], next)
es.mu.Unlock()
}

func (es *eventSequencer) recordPooledEvent() {
next := es.counter.Add(1)

es.mu.Lock()
es.poolSeq = append(es.poolSeq, next)
es.mu.Unlock()
}

// clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test
// execution.
type clientEntity struct {
Expand Down Expand Up @@ -72,30 +118,8 @@ type clientEntity struct {

entityMap *EntityMap

logQueue chan orderedLogMessage
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size. This is a best effort operation that times out after
// some predefined amount of time to avoid blocking tests indefinitely.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
// Don't spend longer than 500ms awaiting minPoolSize.
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-awaitCtx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
return nil
}
}
}
logQueue chan orderedLogMessage
eventSequencer eventSequencer
}

func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
Expand All @@ -118,6 +142,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
entityMap: em,
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
eventSequencer: eventSequencer{
seqByEventType: make(map[monitoringEventType][]int64),
},
}
entity.setRecordEvents(true)

Expand Down Expand Up @@ -226,8 +253,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
}

if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize, *entityOptions.AwaitMinPoolSizeMS); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -326,8 +354,47 @@ func (c *clientEntity) failedEvents() []*event.CommandFailedEvent {
return events
}

func (c *clientEntity) poolEvents() []*event.PoolEvent {
return c.pooled
// filterEventsBySeq filters events by sequence number for the given eventType.
// See comments on eventSequencer for more details.
func filterEventsBySeq[T any](c *clientEntity, events []T, eventType monitoringEventType) []T {
cutoff := c.eventSequencer.cutoff.Load()
if cutoff == 0 {
return events
}

// Lock order: eventProcessMu -> eventSequencer.mu (matches writers)
c.eventProcessMu.RLock()
c.eventSequencer.mu.RLock()

// Snapshot to minimize time under locks and avoid races
localEvents := append([]T(nil), events...)

var seqSlice []int64
if eventType == poolAnyEvent {
seqSlice = c.eventSequencer.poolSeq
} else {
seqSlice = c.eventSequencer.seqByEventType[eventType]
}

localSeqs := append([]int64(nil), seqSlice...)

c.eventSequencer.mu.RUnlock()
c.eventProcessMu.RUnlock()

// guard against index out of range.
n := len(localEvents)
if len(localSeqs) < n {
n = len(localSeqs)
}

filtered := make([]T, 0, n)
for i := 0; i < n; i++ {
if localSeqs[i] > cutoff {
filtered = append(filtered, localEvents[i])
}
}

return filtered
}

func (c *clientEntity) numberConnectionsCheckedOut() int32 {
Expand Down Expand Up @@ -516,7 +583,10 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {

eventType := monitoringEventTypeFromPoolEvent(evt)
if _, ok := c.observedEvents[eventType]; ok {
c.eventProcessMu.Lock()
c.pooled = append(c.pooled, evt)
c.eventSequencer.recordPooledEvent()
c.eventProcessMu.Unlock()
}

c.addEventsCount(eventType)
Expand All @@ -539,6 +609,7 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes

if _, ok := c.observedEvents[serverDescriptionChangedEvent]; ok {
c.serverDescriptionChanged = append(c.serverDescriptionChanged, evt)
c.eventSequencer.recordEvent(serverDescriptionChangedEvent)
}

// Record object-specific unified spec test data on an event.
Expand All @@ -558,6 +629,7 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb

if _, ok := c.observedEvents[serverHeartbeatFailedEvent]; ok {
c.serverHeartbeatFailedEvent = append(c.serverHeartbeatFailedEvent, evt)
c.eventSequencer.recordEvent(serverHeartbeatFailedEvent)
}

c.addEventsCount(serverHeartbeatFailedEvent)
Expand All @@ -573,6 +645,7 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart

if _, ok := c.observedEvents[serverHeartbeatStartedEvent]; ok {
c.serverHeartbeatStartedEvent = append(c.serverHeartbeatStartedEvent, evt)
c.eventSequencer.recordEvent(serverHeartbeatStartedEvent)
}

c.addEventsCount(serverHeartbeatStartedEvent)
Expand All @@ -588,6 +661,7 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea

if _, ok := c.observedEvents[serverHeartbeatSucceededEvent]; ok {
c.serverHeartbeatSucceeded = append(c.serverHeartbeatSucceeded, evt)
c.eventSequencer.recordEvent(serverHeartbeatSucceededEvent)
}

c.addEventsCount(serverHeartbeatSucceededEvent)
Expand All @@ -603,6 +677,7 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog

if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
c.eventSequencer.recordEvent(topologyDescriptionChangedEvent)
}

c.addEventsCount(topologyDescriptionChangedEvent)
Expand All @@ -618,6 +693,7 @@ func (c *clientEntity) processTopologyOpeningEvent(evt *event.TopologyOpeningEve

if _, ok := c.observedEvents[topologyOpeningEvent]; ok {
c.topologyOpening = append(c.topologyOpening, evt)
c.eventSequencer.recordEvent(topologyOpeningEvent)
}

c.addEventsCount(topologyOpeningEvent)
Expand All @@ -633,6 +709,7 @@ func (c *clientEntity) processTopologyClosedEvent(evt *event.TopologyClosedEvent

if _, ok := c.observedEvents[topologyClosedEvent]; ok {
c.topologyClosed = append(c.topologyClosed, evt)
c.eventSequencer.recordEvent(topologyClosedEvent)
}

c.addEventsCount(topologyClosedEvent)
Expand Down Expand Up @@ -724,3 +801,27 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
}
return nil
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size, then clears all CMAP and SDAM events that occurred
// during pool initialization.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64, timeoutMS int) error {
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-awaitCtx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
if uint64(entity.getEventCount(connectionReadyEvent)) >= minPoolSize {
entity.eventSequencer.setCutoff()

return nil
}
}
}
}
Loading
Loading