@@ -142,7 +142,7 @@ type TerminalOpsSyncAgentParams = {
142142 incompleteOpTimeout : number
143143} ;
144144
145- type ObjectMovements = Map < Hash , Map < Endpoint , { secret : string , timeout : number , dependencyChain : Array < Hash > } > > ;
145+ type ObjectMovements = Map < Hash , Map < Endpoint , { timeout : number , secret : string , dependencyChain : Array < Hash > } > > ;
146146type ObjectRequest = { hash : string , dependencyChain : Array < string > } ;
147147type OwnershipProof = { hash : Hash , ownershipProofHash : Hash } ;
148148
@@ -231,16 +231,18 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
231231
232232 for ( const [ hash , destinations ] of objs . entries ( ) ) {
233233
234- let outdatedEndpoints : Array < Hash > = [ ]
234+ let outdatedEndpoints : Array < Hash > = [ ] ;
235235
236236 for ( const [ endpoint , params ] of destinations . entries ( ) ) {
237+
237238 if ( now > params . timeout ) {
238- outdatedEndpoints . push ( endpoint ) ;
239- }
239+ outdatedEndpoints . push ( endpoint ) ;
240+ }
240241 }
241242
242243 for ( const ep of outdatedEndpoints ) {
243244 destinations . delete ( ep ) ;
245+ this . controlLog . warning ( 'fetching of object with hash ' + hash + ' from ' + ep + ' has timed out' )
244246 }
245247
246248 if ( destinations . size === 0 ) {
@@ -450,18 +452,29 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
450452
451453 let secret = new RNGImpl ( ) . randomHexString ( 128 ) ;
452454
455+ let newReqs : Array < ObjectRequest > = [ ] ;
456+
453457 for ( const req of reqs ) {
454- this . expectIncomingObject ( destination , req . hash , req . dependencyChain , secret ) ;
455- }
456458
457- let msg : RequestObjsMessage = {
458- type : TerminalOpsSyncAgentMessageType . RequestObjs ,
459- targetObjHash : this . objHash ,
460- requestedObjects : reqs ,
461- ownershipProofSecret : secret
462- } ;
459+ const pendingReqs = this . incomingObjects . get ( req . hash ) ?. size || 0 ;
463460
464- this . sendSyncMessageToPeer ( destination , msg ) ;
461+ if ( pendingReqs < 2 ) {
462+ if ( this . expectIncomingObject ( destination , req . hash , req . dependencyChain , secret ) ) {
463+ newReqs . push ( req ) ;
464+ }
465+ }
466+ }
467+
468+ if ( newReqs . length > 0 ) {
469+ let msg : RequestObjsMessage = {
470+ type : TerminalOpsSyncAgentMessageType . RequestObjs ,
471+ targetObjHash : this . objHash ,
472+ requestedObjects : newReqs ,
473+ ownershipProofSecret : secret
474+ } ;
475+
476+ this . sendSyncMessageToPeer ( destination , msg ) ;
477+ }
465478 }
466479
467480 sendState ( ep : Endpoint ) {
@@ -483,6 +496,9 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
483496
484497 for ( const req of missing ) {
485498 this . scheduleOutgoingObject ( destination , req . hash , req . dependencyChain , secret ) ;
499+
500+ // note: if the object was already scheduled the above function will return false and
501+ // do nothing, but that is OK.
486502 }
487503
488504 }
@@ -588,34 +604,49 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
588604 let destinations = this . outgoingObjects . get ( hash ) ;
589605
590606 if ( destinations !== undefined ) {
591- for ( const [ endpoint , details ] of destinations . entries ( ) ) {
592- this . tryToSendObjects ( endpoint , [ { hash : hash , dependencyChain : details . dependencyChain } ] , details . secret ) ;
607+ for ( const [ endpoint , params ] of destinations . entries ( ) ) {
608+ this . tryToSendObjects ( endpoint , [ { hash : hash , dependencyChain : params . dependencyChain } ] , params . secret ) ;
593609 }
594610 }
595611
612+ this . controlLog . trace ( 'ops depending on completed object: ' + this . opsForMissingObj . get ( hash ) ?. size ) ;
613+
596614 for ( const opHash of this . opsForMissingObj . get ( hash ) ) {
597615
598616 const incompleteOp = this . incompleteOps . get ( opHash ) as IncompleteOp ;
599617
618+ // incompleteOp is undefined! FIXME
619+
600620 incompleteOp . context . objects . set ( hash , obj ) ;
601621 incompleteOp . missingObjects . delete ( hash ) ;
602622
603623 if ( incompleteOp . missingObjects . size === 0 ) {
604624
605625 try {
606- this . processReceivedObject ( opHash , context ) ;
626+ this . processReceivedObject ( opHash , incompleteOp . context ) ;
607627 // TODO: catch error, log, report bad peer?
608628 } catch ( e ) {
609629 this . controlLog . warning ( 'could not process received object with hash ' + hash + ', error is: ' + e ) ;
610630 } finally {
611631 this . incompleteOps . delete ( opHash ) ;
612-
632+ this . opsForMissingObj . delete ( hash , opHash ) ;
613633 }
614634 }
615635 }
616636
617637 // just in case this op was received partailly before:
618- this . incompleteOps . delete ( hash ) ;
638+
639+ const incompleteOp = this . incompleteOps . get ( hash ) ;
640+
641+ if ( incompleteOp !== undefined ) {
642+
643+ for ( const reqHash of incompleteOp . missingObjects . keys ( ) ) {
644+ this . opsForMissingObj . delete ( reqHash , hash ) ;
645+ }
646+
647+ this . incompleteOps . delete ( hash ) ;
648+ }
649+
619650 }
620651
621652 private async receiveObjects ( source : Endpoint , literalContext : LiteralContext , omittedDeps : Array < OwnershipProof > , secret ?: string ) {
@@ -642,6 +673,8 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
642673 try {
643674 let toRequest = Array < ObjectRequest > ( ) ;
644675
676+ // add omitted dependencies, if their ownership proofs are correct
677+
645678 for ( let [ depHash , depChain ] of context . findMissingDeps ( hash ) . entries ( ) ) {
646679 let dep = await this . store . load ( depHash ) ;
647680 if ( dep === undefined || dep . hash ( secret ) !== ownershipProofForHash . get ( depHash ) ) {
@@ -674,12 +707,15 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
674707 } catch ( e ) {
675708 TerminalOpsSyncAgent . controlLog . warning ( e ) ;
676709 }
710+
677711 this . incomingObjects . delete ( hash ) ;
678712 } else {
679713
680714 // TODO: report missing or incorrect incoming object entry
681715 if ( incoming === undefined ) {
682- this . controlLog . warning ( 'missing incoming object entry for hash ' + hash + ' in object sent by ' + source ) ;
716+ if ( await this . store . load ( hash ) === undefined ) {
717+ this . controlLog . warning ( 'missing incoming object entry for hash ' + hash + ' in object sent by ' + source ) ;
718+ }
683719 } else {
684720 this . controlLog . warning ( 'incoming object secret mismatch, expected: ' + secret + ', received: ' + incoming . secret ) ;
685721 }
@@ -695,17 +731,24 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
695731
696732 private async processIncompleteOp ( source : Endpoint , hash : Hash , context : Context , toRequest : Array < ObjectRequest > ) {
697733
698- let incompleteOp = this . incompleteOps . get ( source ) ;
734+ let incompleteOp = this . incompleteOps . get ( hash ) ;
699735 let missingObjects = new Map < Hash , ObjectRequest > ( toRequest . map ( ( req : ObjectRequest ) => [ req . hash , req ] ) ) ;
700736
701737 if ( incompleteOp === undefined ) {
738+
702739 incompleteOp = {
703740 source : source ,
704741 context : context ,
705742 missingObjects : missingObjects ,
706743 timeout : Date . now ( ) + this . params . incompleteOpTimeout * 1000
707744 } ;
745+
708746 this . incompleteOps . set ( hash , incompleteOp ) ;
747+
748+ for ( const objReq of toRequest ) {
749+ this . opsForMissingObj . add ( objReq . hash , hash ) ;
750+ }
751+
709752 } else {
710753
711754 const initialMissingCount = incompleteOp . missingObjects . size ;
@@ -726,6 +769,7 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
726769 try {
727770 this . processReceivedObject ( hash , context ) ;
728771 } finally {
772+
729773 this . incompleteOps . delete ( hash ) ;
730774 }
731775 } else if ( incompleteOp . missingObjects . size < initialMissingCount ) {
@@ -750,15 +794,15 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
750794 this . acceptedMutationOpClasses . indexOf ( op . value . _class ) >= 0 ;
751795 }
752796
753- private expectIncomingObject ( source : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string ) {
754- this . insertObjectMovement ( this . incomingObjects , source , objHash , dependencyChain , secret , this . params . receiveTimeout ) ;
797+ private expectIncomingObject ( source : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string ) : boolean {
798+ return this . insertObjectMovement ( this . incomingObjects , source , objHash , dependencyChain , secret , this . params . receiveTimeout ) ;
755799 }
756800
757- private scheduleOutgoingObject ( destination : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string ) {
758- this . insertObjectMovement ( this . outgoingObjects , destination , objHash , dependencyChain , secret , this . params . sendTimeout ) ;
801+ private scheduleOutgoingObject ( destination : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string ) : boolean {
802+ return this . insertObjectMovement ( this . outgoingObjects , destination , objHash , dependencyChain , secret , this . params . sendTimeout ) ;
759803 }
760804
761- private insertObjectMovement ( allMovements : ObjectMovements , endpoint : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string , timeout : number ) {
805+ private insertObjectMovement ( allMovements : ObjectMovements , endpoint : Endpoint , objHash : Hash , dependencyChain : Array < Hash > , secret : string , timeout : number ) : boolean {
762806
763807 let movement = allMovements . get ( objHash ) ;
764808
@@ -767,7 +811,13 @@ class TerminalOpsSyncAgent extends PeeringAgentBase implements StateSyncAgent {
767811 allMovements . set ( objHash , movement ) ;
768812 }
769813
770- movement . set ( endpoint , { dependencyChain : dependencyChain , secret : secret , timeout : Date . now ( ) + timeout * 1000 } ) ;
814+ if ( movement . has ( endpoint ) ) {
815+ return false ;
816+ } else {
817+ movement . set ( endpoint , { dependencyChain : dependencyChain , secret : secret , timeout : Date . now ( ) + timeout * 1000 } ) ;
818+ return true ;
819+ }
820+
771821 }
772822
773823}
0 commit comments