@@ -6,7 +6,7 @@ use iroh::{SecretKey, Watcher};
66use iroh_base:: ticket:: NodeTicket ;
77use iroh_blobs:: {
88 api:: downloader:: Shuffled ,
9- provider:: Event ,
9+ provider:: { AbortReason , Event , EventMask , EventSender2 , ProviderMessage } ,
1010 store:: fs:: FsStore ,
1111 test:: { add_hash_sequences, create_random_blobs} ,
1212 HashAndFormat ,
@@ -104,78 +104,66 @@ pub fn dump_provider_events(
104104 allow_push : bool ,
105105) -> (
106106 tokio:: task:: JoinHandle < ( ) > ,
107- mpsc :: Sender < iroh_blobs :: provider :: Event > ,
107+ EventSender2 ,
108108) {
109109 let ( tx, mut rx) = mpsc:: channel ( 100 ) ;
110110 let dump_task = tokio:: spawn ( async move {
111111 while let Some ( event) = rx. recv ( ) . await {
112112 match event {
113- Event :: ClientConnected {
114- node_id,
115- connection_id,
116- permitted,
117- } => {
118- permitted. send ( true ) . await . ok ( ) ;
119- println ! ( "Client connected: {node_id} {connection_id}" ) ;
113+ ProviderMessage :: ClientConnected ( msg) => {
114+ println ! ( "{:?}" , msg. inner) ;
115+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
120116 }
121- Event :: GetRequestReceived {
122- connection_id,
123- request_id,
124- hash,
125- ranges,
126- } => {
127- println ! (
128- "Get request received: {connection_id} {request_id} {hash} {ranges:?}"
129- ) ;
117+ ProviderMessage :: ClientConnectedNotify ( msg) => {
118+ println ! ( "{:?}" , msg. inner) ;
130119 }
131- Event :: TransferCompleted {
132- connection_id,
133- request_id,
134- stats,
135- } => {
136- println ! ( "Transfer completed: {connection_id} {request_id} {stats:?}" ) ;
120+ ProviderMessage :: ConnectionClosed ( msg) => {
121+ println ! ( "{:?}" , msg. inner) ;
137122 }
138- Event :: TransferAborted {
139- connection_id,
140- request_id,
141- stats,
142- } => {
143- println ! ( "Transfer aborted: {connection_id} {request_id} {stats:?}" ) ;
123+ ProviderMessage :: GetRequestReceived ( mut msg) => {
124+ println ! ( "{:?}" , msg. inner) ;
125+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
126+ tokio:: spawn ( async move {
127+ while let Ok ( update) = msg. rx . recv ( ) . await {
128+ info ! ( "{update:?}" ) ;
129+ }
130+ } ) ;
144131 }
145- Event :: TransferProgress {
146- connection_id,
147- request_id,
148- index,
149- end_offset,
150- } => {
151- info ! ( "Transfer progress: {connection_id} {request_id} {index} {end_offset}" ) ;
132+ ProviderMessage :: GetRequestReceivedNotify ( msg) => {
133+ println ! ( "{:?}" , msg. inner) ;
152134 }
153- Event :: PushRequestReceived {
154- connection_id,
155- request_id,
156- hash,
157- ranges,
158- permitted,
159- } => {
160- if allow_push {
161- permitted. send ( true ) . await . ok ( ) ;
162- println ! (
163- "Push request received: {connection_id} {request_id} {hash} {ranges:?}"
164- ) ;
135+ ProviderMessage :: GetManyRequestReceived ( mut msg) => {
136+ println ! ( "{:?}" , msg. inner) ;
137+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
138+ tokio:: spawn ( async move {
139+ while let Ok ( update) = msg. rx . recv ( ) . await {
140+ info ! ( "{update:?}" ) ;
141+ }
142+ } ) ;
143+ }
144+ ProviderMessage :: GetManyRequestReceivedNotify ( msg) => {
145+ println ! ( "{:?}" , msg. inner) ;
146+ }
147+ ProviderMessage :: PushRequestReceived ( msg) => {
148+ println ! ( "{:?}" , msg. inner) ;
149+ let res = if allow_push {
150+ Ok ( ( ) )
165151 } else {
166- permitted. send ( false ) . await . ok ( ) ;
167- println ! (
168- "Push request denied: {connection_id} {request_id} {hash} {ranges:?}"
169- ) ;
170- }
152+ Err ( AbortReason :: Permission )
153+ } ;
154+ msg. tx . send ( res) . await . ok ( ) ;
155+ }
156+ ProviderMessage :: PushRequestReceivedNotify ( msg) => {
157+ println ! ( "{:?}" , msg. inner) ;
171158 }
172- _ => {
173- info ! ( "Received event: {:?}" , event) ;
159+ ProviderMessage :: Throttle ( msg) => {
160+ println ! ( "{:?}" , msg. inner) ;
161+ msg. tx . send ( Ok ( ( ) ) ) . await . ok ( ) ;
174162 }
175163 }
176164 }
177165 } ) ;
178- ( dump_task, tx )
166+ ( dump_task, EventSender2 :: new ( tx , EventMask :: ALL ) )
179167}
180168
181169#[ tokio:: main]
@@ -237,7 +225,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
237225 . bind ( )
238226 . await ?;
239227 let ( dump_task, events_tx) = dump_provider_events ( args. allow_push ) ;
240- let blobs = iroh_blobs:: BlobsProtocol :: new ( & store, endpoint. clone ( ) , Some ( events_tx) ) ;
228+ let blobs = iroh_blobs:: BlobsProtocol :: new ( & store, endpoint. clone ( ) , events_tx) ;
241229 let router = iroh:: protocol:: Router :: builder ( endpoint. clone ( ) )
242230 . accept ( iroh_blobs:: ALPN , blobs)
243231 . spawn ( ) ;
0 commit comments