@@ -7,6 +7,7 @@ import { Endpoint, LinkupMessage, NetworkAgent, NetworkEventType } from '../netw
77import { ObjectBroadcastAgent , ObjectBroadcastRequest , ObjectBroadcastReply } from './ObjectBroadcastAgent' ;
88
99import { AsyncStream , BufferedAsyncStream , BufferingAsyncStreamSource , AsyncStreamSource , FilteredAsyncStreamSource } from 'util/streams' ;
10+ import { LRUCache } from 'util/caching' ;
1011
1112type Params = {
1213 broadcastedSuffixBits : number ,
@@ -28,6 +29,10 @@ class ObjectDiscoveryAgent implements Agent {
2829
2930 static newestReplyFirst = ( a : ObjectDiscoveryReply , b : ObjectDiscoveryReply ) => ( b . timestamp - a . timestamp ) ;
3031
32+ private static makeEndpointObjectPair ( hash : Hash , source : Endpoint , destination : Endpoint ) {
33+ return hash + '_' + source . replaceAll ( '_' , '__' ) + '_' + destination . replaceAll ( '_' , '__' ) ;
34+ }
35+
3136 pod ?: AgentPod ;
3237
3338
@@ -39,6 +44,7 @@ class ObjectDiscoveryAgent implements Agent {
3944
4045 streamSource : BufferingAsyncStreamSource < ObjectDiscoveryReply > ;
4146
47+ discoveredEndpointObjectPairs : LRUCache < string , HashedObject > ;
4248
4349 wasShutdown = false ;
4450
@@ -60,6 +66,8 @@ class ObjectDiscoveryAgent implements Agent {
6066 this . lastQueryingTimePerServer = new Map ( ) ;
6167
6268 this . streamSource = new BufferingAsyncStreamSource ( this . params . maxStoredReplies ) ;
69+
70+ this . discoveredEndpointObjectPairs = new LRUCache ( 256 ) ;
6371 }
6472
6573 getAgentId ( ) : string {
@@ -169,52 +177,81 @@ class ObjectDiscoveryAgent implements Agent {
169177
170178 if ( msg . agentId === this . getAgentId ( ) ) {
171179
172- const reply = msg . content as ObjectBroadcastReply ;
173-
174180
175181
176- let replyHash : Hash | undefined = undefined ;
177- let object : HashedObject | undefined = undefined ;
178- let error : string | undefined = undefined ;
179- try {
180- object = HashedObject . fromLiteralContext ( reply . literalContext ) ;
181- replyHash = object . hash ( ) ;
182- } catch ( e : any ) {
182+ this . processReply ( msg ) ;
183+
184+ }
183185
184- // Since anybody may reply with _anything_, only replies that at least match the hash
185- // suffix being queried will be reported back to the caller.
186+ }
187+ }
186188
187- ObjectDiscoveryAgent . log . warning ( 'Error deliteralizing object discovery reply:' + e ) ;
188- object = undefined ;
189- replyHash = undefined ;
190- const literal = reply . literalContext . literals [ reply . literalContext . rootHashes [ 0 ] ] ;
189+ private processReply ( msg : LinkupMessage ) {
191190
192- if ( literal !== undefined && LiteralUtils . validateHash ( literal ) ) {
193- if ( literal . hash === reply . literalContext . rootHashes [ 0 ] ) {
194- replyHash = literal . hash ;
195- }
196- }
191+ const reply = msg . content as ObjectBroadcastReply ;
197192
198- error = String ( e ) ;
199- }
193+ let replyHash : Hash | undefined = undefined ;
194+ let object : HashedObject | undefined = undefined ;
195+ let error : string | undefined = undefined ;
200196
201- if ( replyHash === reply . literalContext . rootHashes [ 0 ] &&
202- this . hexHashSuffix === Hashing . toHex ( replyHash ) . slice ( - this . hexHashSuffix . length ) &&
203- this . localEndpoints . has ( msg . destination ) ) {
197+
204198
205- ObjectDiscoveryAgent . log . trace ( ( ) => 'Received object with hash ' + replyHash + ' from ' + msg . source + ' at ' + msg . destination ) ;
199+ try {
200+ const declaredHash = reply . literalContext . rootHashes . length === 1 ?
201+ reply . literalContext . rootHashes [ 0 ]
202+ :
203+ '' ;
206204
207- let item = { source : msg . source , destination : msg . destination , hash : replyHash , object : object , error : error , timestamp : Date . now ( ) } ;
208-
205+ if ( this . hexHashSuffix === Hashing . toHex ( declaredHash ) . slice ( - this . hexHashSuffix . length ) ) {
206+
207+ const cacheKey = ObjectDiscoveryAgent . makeEndpointObjectPair ( declaredHash , msg . source , msg . destination ) ;
208+ object = this . discoveredEndpointObjectPairs . get ( cacheKey ) ;
209209
210- this . streamSource . ingest ( item ) ;
211-
210+ if ( object === undefined ) {
211+ object = HashedObject . fromLiteralContext ( reply . literalContext ) ;
212+ replyHash = object . hash ( ) ;
213+
214+ if ( replyHash === declaredHash ) {
215+ this . discoveredEndpointObjectPairs . set ( cacheKey , object ) ;
216+ }
212217 } else {
213- ObjectDiscoveryAgent . log . debug ( 'Error validating object discovery reply' ) ;
218+ replyHash = declaredHash ;
214219 }
220+ }
221+
222+
223+ } catch ( e : any ) {
224+
225+ // Since anybody may reply with _anything_, only replies that at least match the hash
226+ // suffix being queried will be reported back to the caller.
227+
228+ ObjectDiscoveryAgent . log . warning ( 'Error deliteralizing object discovery reply:' + e ) ;
229+ object = undefined ;
230+ replyHash = undefined ;
231+ const literal = reply . literalContext . literals [ reply . literalContext . rootHashes [ 0 ] ] ;
215232
233+ if ( literal !== undefined && LiteralUtils . validateHash ( literal ) ) {
234+ if ( literal . hash === reply . literalContext . rootHashes [ 0 ] ) {
235+ replyHash = literal . hash ;
236+ }
216237 }
217238
239+ error = String ( e ) ;
240+ }
241+
242+ if ( replyHash === reply . literalContext . rootHashes [ 0 ] &&
243+ this . hexHashSuffix === Hashing . toHex ( replyHash ) . slice ( - this . hexHashSuffix . length ) &&
244+ this . localEndpoints . has ( msg . destination ) ) {
245+
246+ ObjectDiscoveryAgent . log . trace ( ( ) => 'Received object with hash ' + replyHash + ' from ' + msg . source + ' at ' + msg . destination ) ;
247+
248+ let item = { source : msg . source , destination : msg . destination , hash : replyHash , object : object , error : error , timestamp : Date . now ( ) } ;
249+
250+
251+ this . streamSource . ingest ( item ) ;
252+
253+ } else {
254+ ObjectDiscoveryAgent . log . debug ( 'Error validating object discovery reply' ) ;
218255 }
219256 }
220257
0 commit comments