1+ //! This example shows how to create tags that expire after a certain time.
2+ //!
3+ //! We use a prefix so we can distinguish between expiring and normal tags, and
4+ //! then encode the expiry date in the tag name after the prefix, in a format
5+ //! that sorts in the same order as the expiry date.
6+ //!
7+ //! Then we can just use
18use std:: time:: { Duration , SystemTime } ;
29
310use chrono:: Utc ;
411use futures_lite:: StreamExt ;
512use iroh:: endpoint;
6- use iroh_blobs:: store:: GcConfig ;
7- use iroh_blobs:: { hashseq:: HashSeq , BlobFormat , HashAndFormat } ;
8- use iroh_blobs:: Hash ;
9-
10- use iroh_blobs:: rpc:: client:: blobs:: MemClient as BlobsClient ;
13+ use iroh_blobs:: {
14+ hashseq:: HashSeq , rpc:: client:: blobs:: MemClient as BlobsClient , store:: GcConfig , BlobFormat ,
15+ Hash , HashAndFormat , Tag ,
16+ } ;
1117use tokio:: signal:: ctrl_c;
1218
1319/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
1420///
1521/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
16- ///
1722async fn create_expiring_tag (
1823 iroh : & BlobsClient ,
1924 hashes : & [ Hash ] ,
@@ -40,65 +45,78 @@ async fn create_expiring_tag(
4045 Ok ( ( ) )
4146}
4247
43- async fn delete_expired_tags ( iroh : & BlobsClient , prefix : & str ) -> anyhow:: Result < ( ) > {
44- let mut tags = iroh . tags ( ) . list ( ) . await ?;
48+ async fn delete_expired_tags ( blobs : & BlobsClient , prefix : & str , bulk : bool ) -> anyhow:: Result < ( ) > {
49+ let mut tags = blobs . tags ( ) . list ( ) . await ?;
4550 let prefix = format ! ( "{}-" , prefix) ;
4651 let now = chrono:: Utc :: now ( ) ;
47- let mut to_delete = Vec :: new ( ) ;
48- while let Some ( tag) = tags. next ( ) . await {
49- let tag = tag?. name ;
50- if let Some ( rest) = tag. 0 . strip_prefix ( prefix. as_bytes ( ) ) {
51- let Ok ( expiry) = std:: str:: from_utf8 ( rest) else {
52- tracing:: warn!( "Tag {} does have non utf8 expiry" , tag) ;
53- continue ;
54- } ;
55- let Ok ( expiry) = chrono:: DateTime :: parse_from_rfc3339 ( expiry) else {
56- tracing:: warn!( "Tag {} does have invalid expiry date" , tag) ;
57- continue ;
58- } ;
59- let expiry = expiry. with_timezone ( & Utc ) ;
60- if expiry < now {
61- to_delete. push ( tag) ;
52+ let end = format ! (
53+ "{}-{}" ,
54+ prefix,
55+ now. to_rfc3339_opts( chrono:: SecondsFormat :: Secs , true )
56+ ) ;
57+ if bulk {
58+ // delete all tags with the prefix and an expiry date before now
59+ //
60+ // this should be very efficient, since it is just a single database operation
61+ blobs
62+ . tags ( )
63+ . delete_range ( Tag :: from ( prefix. clone ( ) ) ..Tag :: from ( end) )
64+ . await ?;
65+ } else {
66+ // find tags to delete one by one and then delete them
67+ //
68+ // this allows us to print the tags before deleting them
69+ let mut to_delete = Vec :: new ( ) ;
70+ while let Some ( tag) = tags. next ( ) . await {
71+ let tag = tag?. name ;
72+ if let Some ( rest) = tag. 0 . strip_prefix ( prefix. as_bytes ( ) ) {
73+ let Ok ( expiry) = std:: str:: from_utf8 ( rest) else {
74+ tracing:: warn!( "Tag {} does have non utf8 expiry" , tag) ;
75+ continue ;
76+ } ;
77+ let Ok ( expiry) = chrono:: DateTime :: parse_from_rfc3339 ( expiry) else {
78+ tracing:: warn!( "Tag {} does have invalid expiry date" , tag) ;
79+ continue ;
80+ } ;
81+ let expiry = expiry. with_timezone ( & Utc ) ;
82+ if expiry < now {
83+ to_delete. push ( tag) ;
84+ }
6285 }
6386 }
64- }
65- for tag in to_delete {
66- println ! ( "Deleting expired tag {}" , tag) ;
67- iroh . tags ( ) . delete ( tag ) . await ? ;
87+ for tag in to_delete {
88+ println ! ( "Deleting expired tag {}" , tag ) ;
89+ blobs . tags ( ) . delete ( tag) . await ? ;
90+ }
6891 }
6992 Ok ( ( ) )
7093}
7194
72- async fn print_tags_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
95+ async fn info_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
96+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
7397 loop {
7498 let now = chrono:: Utc :: now ( ) ;
7599 let mut tags = blobs. tags ( ) . list ( ) . await ?;
76- println ! ( "Tags at {}:\n " , now) ;
100+ println ! ( "Current time: {}" , now. to_rfc3339_opts( chrono:: SecondsFormat :: Secs , true ) ) ;
101+ println ! ( "Tags:" ) ;
77102 while let Some ( tag) = tags. next ( ) . await {
78103 let tag = tag?;
79104 println ! ( " {:?}" , tag) ;
80105 }
81- println ! ( ) ;
82- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
83- }
84- }
85-
86- async fn print_blobs_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
87- loop {
88- let now = chrono:: Utc :: now ( ) ;
89106 let mut blobs = blobs. list ( ) . await ?;
90- println ! ( "Blobs at {}: \n " , now ) ;
107+ println ! ( "Blobs:" ) ;
91108 while let Some ( info) = blobs. next ( ) . await {
92- println ! ( " {:?}" , info?) ;
109+ let info = info?;
110+ println ! ( " {} {} bytes" , info. hash, info. size) ;
93111 }
94112 println ! ( ) ;
95113 tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
96114 }
97115}
98116
99- async fn delete_expired_tags_task ( blobs : BlobsClient , prefix : & str , ) -> anyhow:: Result < ( ) > {
117+ async fn delete_expired_tags_task ( blobs : BlobsClient , prefix : & str ) -> anyhow:: Result < ( ) > {
100118 loop {
101- delete_expired_tags ( & blobs, prefix) . await ?;
119+ delete_expired_tags ( & blobs, prefix, false ) . await ?;
102120 tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
103121 }
104122}
@@ -108,8 +126,7 @@ async fn main() -> anyhow::Result<()> {
108126 tracing_subscriber:: fmt:: init ( ) ;
109127 let endpoint = endpoint:: Endpoint :: builder ( ) . bind ( ) . await ?;
110128 let store = iroh_blobs:: store:: fs:: Store :: load ( "blobs" ) . await ?;
111- let blobs = iroh_blobs:: net_protocol:: Blobs :: builder ( store)
112- . build ( & endpoint) ;
129+ let blobs = iroh_blobs:: net_protocol:: Blobs :: builder ( store) . build ( & endpoint) ;
113130 // enable gc with a short period
114131 blobs. start_gc ( GcConfig {
115132 period : Duration :: from_secs ( 1 ) ,
@@ -120,36 +137,44 @@ async fn main() -> anyhow::Result<()> {
120137 // You can skip this if you don't want to serve the data over the network.
121138 let router = iroh:: protocol:: Router :: builder ( endpoint)
122139 . accept ( iroh_blobs:: ALPN , blobs. clone ( ) )
123- . spawn ( ) . await ?;
140+ . spawn ( )
141+ . await ?;
124142
125143 // setup: add some data and tag it
126144 {
127145 // add several blobs and tag them with an expiry date 10 seconds in the future
128146 let batch = blobs. client ( ) . batch ( ) . await ?;
129147 let a = batch. add_bytes ( "blob 1" . as_bytes ( ) ) . await ?;
130148 let b = batch. add_bytes ( "blob 2" . as_bytes ( ) ) . await ?;
131- let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 10 ) ) . unwrap ( ) ;
132- create_expiring_tag ( blobs. client ( ) , & [ * a. hash ( ) , * b. hash ( ) ] , "expiring" , expires_at) . await ?;
149+ let expires_at = SystemTime :: now ( )
150+ . checked_add ( Duration :: from_secs ( 10 ) )
151+ . unwrap ( ) ;
152+ create_expiring_tag (
153+ blobs. client ( ) ,
154+ & [ * a. hash ( ) , * b. hash ( ) ] ,
155+ "expiring" ,
156+ expires_at,
157+ )
158+ . await ?;
133159
134160 // add a single blob and tag it with an expiry date 60 seconds in the future
135161 let c = batch. add_bytes ( "blob 3" . as_bytes ( ) ) . await ?;
136- let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 60 ) ) . unwrap ( ) ;
162+ let expires_at = SystemTime :: now ( )
163+ . checked_add ( Duration :: from_secs ( 60 ) )
164+ . unwrap ( ) ;
137165 create_expiring_tag ( blobs. client ( ) , & [ * c. hash ( ) ] , "expiring" , expires_at) . await ?;
138166 // batch goes out of scope, so data is only protected by the tags we created
139167 }
140168 let client = blobs. client ( ) . clone ( ) ;
141169
142170 // delete expired tags every 5 seconds
143- let check_task = tokio:: spawn ( delete_expired_tags_task ( client. clone ( ) , "expiring" ) ) ;
144- // print tags every 5 seconds
145- let print_tags_task = tokio:: spawn ( print_tags_task ( client. clone ( ) ) ) ;
146- // print blobs every 5 seconds
147- let print_blobs_task = tokio:: spawn ( print_blobs_task ( client) ) ;
171+ let delete_task = tokio:: spawn ( delete_expired_tags_task ( client. clone ( ) , "expiring" ) ) ;
172+ // print all tags and blobs every 5 seconds
173+ let info_task = tokio:: spawn ( info_task ( client. clone ( ) ) ) ;
148174
149175 ctrl_c ( ) . await ?;
176+ delete_task. abort ( ) ;
177+ info_task. abort ( ) ;
150178 router. shutdown ( ) . await ?;
151- check_task. abort ( ) ;
152- print_tags_task. abort ( ) ;
153- print_blobs_task. abort ( ) ;
154179 Ok ( ( ) )
155- }
180+ }
0 commit comments