@@ -36,6 +36,7 @@ use libp2p_swarm::{
3636} ;
3737use log:: warn;
3838use smallvec:: SmallVec ;
39+ use std:: collections:: VecDeque ;
3940use std:: { io, pin:: Pin , task:: Context , task:: Poll , time:: Duration } ;
4041
4142pub struct Proto {
@@ -64,6 +65,17 @@ impl IntoConnectionHandler for Proto {
6465 }
6566}
6667
68+ /// A pending reply to an inbound identification request.
69+ enum Pending {
70+ /// The reply is queued for sending.
71+ Queued ( Reply ) ,
72+ /// The reply is being sent.
73+ Sending {
74+ peer : PeerId ,
75+ io : Pin < Box < dyn Future < Output = Result < ( ) , UpgradeError > > + Send > > ,
76+ } ,
77+ }
78+
6779/// A reply to an inbound identification request.
6880#[ derive( Debug ) ]
6981pub struct Reply {
@@ -90,6 +102,9 @@ pub struct Handler {
90102 > ; 4 ] ,
91103 > ,
92104
105+ /// Pending replies to send.
106+ pending_replies : VecDeque < Pending > ,
107+
93108 /// Future that fires when we need to identify the node again.
94109 trigger_next_identify : Delay ,
95110
@@ -106,11 +121,13 @@ pub struct Handler {
106121pub enum Event {
107122 /// We obtained identification information from the remote.
108123 Identified ( Info ) ,
124+ /// We replied to an identification request from the remote.
125+ Identification ( PeerId ) ,
109126 /// We actively pushed our identification information to the remote.
110127 IdentificationPushed ,
111128 /// We received a request for identification.
112129 Identify ( ReplySubstream < NegotiatedSubstream > ) ,
113- /// Failed to identify the remote.
130+ /// Failed to identify the remote, or to reply to an identification request .
114131 IdentificationError ( ConnectionHandlerUpgrErr < UpgradeError > ) ,
115132}
116133
@@ -130,6 +147,7 @@ impl Handler {
130147 remote_peer_id,
131148 inbound_identify_push : Default :: default ( ) ,
132149 events : SmallVec :: new ( ) ,
150+ pending_replies : VecDeque :: new ( ) ,
133151 trigger_next_identify : Delay :: new ( initial_delay) ,
134152 keep_alive : KeepAlive :: Yes ,
135153 interval,
@@ -231,8 +249,15 @@ impl ConnectionHandler for Handler {
231249 ) ,
232250 } ) ;
233251 }
234- InEvent :: Identify ( _) => {
235- todo ! ( )
252+ InEvent :: Identify ( reply) => {
253+ if !self . pending_replies . is_empty ( ) {
254+ warn ! (
255+ "New inbound identify request from {} while a previous one \
256+ is still pending. Queueing the new one.",
257+ reply. peer,
258+ ) ;
259+ }
260+ self . pending_replies . push_back ( Pending :: Queued ( reply) ) ;
236261 }
237262 }
238263 }
@@ -275,6 +300,39 @@ impl ConnectionHandler for Handler {
275300 }
276301 }
277302
303+ // Check for pending replies to send.
304+ if let Some ( mut pending) = self . pending_replies . pop_front ( ) {
305+ loop {
306+ match pending {
307+ Pending :: Queued ( Reply { peer, io, info } ) => {
308+ let io = Box :: pin ( io. send ( info) ) ;
309+ pending = Pending :: Sending { peer, io } ;
310+ }
311+ Pending :: Sending { peer, mut io } => {
312+ match Future :: poll ( Pin :: new ( & mut io) , cx) {
313+ Poll :: Pending => {
314+ self . pending_replies
315+ . push_front ( Pending :: Sending { peer, io } ) ;
316+ return Poll :: Pending ;
317+ }
318+ Poll :: Ready ( Ok ( ( ) ) ) => {
319+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
320+ Event :: Identification ( peer) ,
321+ ) ) ;
322+ }
323+ Poll :: Ready ( Err ( err) ) => {
324+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
325+ Event :: IdentificationError ( ConnectionHandlerUpgrErr :: Upgrade (
326+ libp2p_core:: upgrade:: UpgradeError :: Apply ( err) ,
327+ ) ) ,
328+ ) )
329+ }
330+ }
331+ }
332+ }
333+ }
334+ }
335+
278336 Poll :: Pending
279337 }
280338
0 commit comments