@@ -3,7 +3,6 @@ import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
33import { type PubSub , type Message , type StrictNoSign , type StrictSign , type PubSubInit , type PubSubEvents , type PeerStreams , type PubSubRPCMessage , type PubSubRPC , type PubSubRPCSubscription , type SubscriptionChangeData , type PublishResult , type TopicValidatorFn , TopicValidatorResult } from '@libp2p/interface/pubsub'
44import { logger } from '@libp2p/logger'
55import { PeerMap , PeerSet } from '@libp2p/peer-collections'
6- import { createTopology } from '@libp2p/topology'
76import { pipe } from 'it-pipe'
87import Queue from 'p-queue'
98import { codes } from './errors.js'
@@ -128,10 +127,10 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
128127
129128 // register protocol with topology
130129 // Topology callbacks called on connection manager changes
131- const topology = createTopology ( {
130+ const topology = {
132131 onConnect : this . _onPeerConnected ,
133132 onDisconnect : this . _onPeerDisconnected
134- } )
133+ }
135134 this . _registrarTopologyIds = await Promise . all ( this . multicodecs . map ( async multicodec => registrar . register ( multicodec , topology ) ) )
136135
137136 log ( 'started' )
@@ -181,12 +180,12 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
181180 const { stream, connection } = data
182181 const peerId = connection . remotePeer
183182
184- if ( stream . stat . protocol == null ) {
183+ if ( stream . protocol == null ) {
185184 stream . abort ( new Error ( 'Stream was not multiplexed' ) )
186185 return
187186 }
188187
189- const peer = this . addPeer ( peerId , stream . stat . protocol )
188+ const peer = this . addPeer ( peerId , stream . protocol )
190189 const inboundStream = peer . attachInboundStream ( stream )
191190
192191 this . processMessages ( peerId , inboundStream , peer )
@@ -203,12 +202,12 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
203202 try {
204203 const stream = await conn . newStream ( this . multicodecs )
205204
206- if ( stream . stat . protocol == null ) {
205+ if ( stream . protocol == null ) {
207206 stream . abort ( new Error ( 'Stream was not multiplexed' ) )
208207 return
209208 }
210209
211- const peer = this . addPeer ( peerId , stream . stat . protocol )
210+ const peer = this . addPeer ( peerId , stream . protocol )
212211 await peer . attachOutboundStream ( stream )
213212 } catch ( err : any ) {
214213 log . error ( err )
0 commit comments