@@ -111,6 +111,45 @@ def close
111111 @conn = nil
112112 end
113113
114+ # Internal: Reads messages by ID from a queue, falling back to reading from
115+ # the connected socket until a message matching the ID is read. Any messages
116+ # with mismatched IDs gets queued for subsequent reads by the origin of that
117+ # message ID.
118+ #
119+ # Returns a Net::LDAP::PDU object or nil.
120+ def queued_read ( message_id )
121+ if pdu = message_queue [ message_id ] . shift
122+ return pdu
123+ end
124+
125+ # read messages until we have a match for the given message_id
126+ while pdu = read
127+ if pdu . message_id == message_id
128+ return pdu
129+ else
130+ message_queue [ pdu . message_id ] . push pdu
131+ next
132+ end
133+ end
134+
135+ pdu
136+ end
137+
138+ # Internal: The internal queue of messages, read from the socket, grouped by
139+ # message ID.
140+ #
141+ # Used by `queued_read` to return messages sent by the server with the given
142+ # ID. If no messages are queued for that ID, `queued_read` will `read` from
143+ # the socket and queue messages that don't match the given ID for other
144+ # readers.
145+ #
146+ # Returns the message queue Hash.
147+ def message_queue
148+ @message_queue ||= Hash . new do |hash , key |
149+ hash [ key ] = [ ]
150+ end
151+ end
152+
114153 # Internal: Reads and parses data from the configured connection.
115154 #
116155 # - syntax: the BER syntax to use to parse the read data with
@@ -146,9 +185,9 @@ def read(syntax = Net::LDAP::AsnSyntax)
146185 #
147186 # Returns the return value from writing to the connection, which in some
148187 # cases is the Integer number of bytes written to the socket.
149- def write ( request , controls = nil )
188+ def write ( request , controls = nil , message_id = next_msgid )
150189 instrument "write.net_ldap_connection" do |payload |
151- packet = [ next_msgid . to_ber , request , controls ] . compact . to_ber_sequence
190+ packet = [ message_id . to_ber , request , controls ] . compact . to_ber_sequence
152191 payload [ :content_length ] = @conn . write ( packet )
153192 end
154193 end
@@ -377,7 +416,10 @@ def search(args = nil)
377416 result_pdu = nil
378417 n_results = 0
379418
419+ message_id = next_msgid
420+
380421 instrument "search.net_ldap_connection" ,
422+ message_id : message_id ,
381423 filter : filter ,
382424 base : base ,
383425 scope : scope ,
@@ -425,12 +467,12 @@ def search(args = nil)
425467 controls << ber_sort if ber_sort
426468 controls = controls . empty? ? nil : controls . to_ber_contextspecific ( 0 )
427469
428- write ( request , controls )
470+ write ( request , controls , message_id )
429471
430472 result_pdu = nil
431473 controls = [ ]
432474
433- while pdu = read
475+ while pdu = queued_read ( message_id )
434476 case pdu . app_tag
435477 when Net ::LDAP ::PDU ::SearchReturnedData
436478 n_results += 1
@@ -498,6 +540,16 @@ def search(args = nil)
498540
499541 result_pdu || OpenStruct . new ( :status => :failure , :result_code => 1 , :message => "Invalid search" )
500542 end # instrument
543+ ensure
544+ # clean up message queue for this search
545+ messages = message_queue . delete ( message_id )
546+
547+ # in the exceptional case some messages were *not* consumed from the queue,
548+ # instrument the event but do not fail.
549+ unless messages . empty?
550+ instrument "search_messages_unread.net_ldap_connection" ,
551+ message_id : message_id , messages : messages
552+ end
501553 end
502554
503555 MODIFY_OPERATIONS = { #:nodoc:
0 commit comments