1+ use std:: time:: { Duration , SystemTime } ;
2+
3+ use chrono:: Utc ;
4+ use futures_lite:: StreamExt ;
5+ use iroh:: endpoint;
6+ use iroh_blobs:: store:: GcConfig ;
7+ use iroh_blobs:: { hashseq:: HashSeq , BlobFormat , HashAndFormat } ;
8+ use iroh_blobs:: Hash ;
9+
10+ use iroh_blobs:: rpc:: client:: blobs:: MemClient as BlobsClient ;
11+ use tokio:: signal:: ctrl_c;
12+
13+ /// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
14+ ///
15+ /// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
16+ ///
17+ async fn create_expiring_tag (
18+ iroh : & BlobsClient ,
19+ hashes : & [ Hash ] ,
20+ prefix : & str ,
21+ expiry : SystemTime ,
22+ ) -> anyhow:: Result < ( ) > {
23+ let expiry = chrono:: DateTime :: < chrono:: Utc > :: from ( expiry) ;
24+ let expiry = expiry. to_rfc3339_opts ( chrono:: SecondsFormat :: Secs , true ) ;
25+ let tagname = format ! ( "{}-{}" , prefix, expiry) ;
26+ let batch = iroh. batch ( ) . await ?;
27+ let tt = if hashes. is_empty ( ) {
28+ return Ok ( ( ) ) ;
29+ } else if hashes. len ( ) == 1 {
30+ let hash = hashes[ 0 ] ;
31+ batch. temp_tag ( HashAndFormat :: raw ( hash) ) . await ?
32+ } else {
33+ let hs = hashes. into_iter ( ) . copied ( ) . collect :: < HashSeq > ( ) ;
34+ batch
35+ . add_bytes_with_opts ( hs. into_inner ( ) , BlobFormat :: HashSeq )
36+ . await ?
37+ } ;
38+ batch. persist_to ( tt, tagname. as_str ( ) . into ( ) ) . await ?;
39+ println ! ( "Created tag {}" , tagname) ;
40+ Ok ( ( ) )
41+ }
42+
43+ async fn delete_expired_tags ( iroh : & BlobsClient , prefix : & str ) -> anyhow:: Result < ( ) > {
44+ let mut tags = iroh. tags ( ) . list ( ) . await ?;
45+ let prefix = format ! ( "{}-" , prefix) ;
46+ let now = chrono:: Utc :: now ( ) ;
47+ let mut to_delete = Vec :: new ( ) ;
48+ while let Some ( tag) = tags. next ( ) . await {
49+ let tag = tag?. name ;
50+ if let Some ( rest) = tag. 0 . strip_prefix ( prefix. as_bytes ( ) ) {
51+ let Ok ( expiry) = std:: str:: from_utf8 ( rest) else {
52+ tracing:: warn!( "Tag {} does have non utf8 expiry" , tag) ;
53+ continue ;
54+ } ;
55+ let Ok ( expiry) = chrono:: DateTime :: parse_from_rfc3339 ( expiry) else {
56+ tracing:: warn!( "Tag {} does have invalid expiry date" , tag) ;
57+ continue ;
58+ } ;
59+ let expiry = expiry. with_timezone ( & Utc ) ;
60+ if expiry < now {
61+ to_delete. push ( tag) ;
62+ }
63+ }
64+ }
65+ for tag in to_delete {
66+ println ! ( "Deleting expired tag {}" , tag) ;
67+ iroh. tags ( ) . delete ( tag) . await ?;
68+ }
69+ Ok ( ( ) )
70+ }
71+
72+ async fn print_tags_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
73+ loop {
74+ let now = chrono:: Utc :: now ( ) ;
75+ let mut tags = blobs. tags ( ) . list ( ) . await ?;
76+ println ! ( "Tags at {}:\n " , now) ;
77+ while let Some ( tag) = tags. next ( ) . await {
78+ let tag = tag?;
79+ println ! ( " {:?}" , tag) ;
80+ }
81+ println ! ( ) ;
82+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
83+ }
84+ }
85+
86+ async fn print_blobs_task ( blobs : BlobsClient ) -> anyhow:: Result < ( ) > {
87+ loop {
88+ let now = chrono:: Utc :: now ( ) ;
89+ let mut blobs = blobs. list ( ) . await ?;
90+ println ! ( "Blobs at {}:\n " , now) ;
91+ while let Some ( info) = blobs. next ( ) . await {
92+ println ! ( " {:?}" , info?) ;
93+ }
94+ println ! ( ) ;
95+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
96+ }
97+ }
98+
99+ async fn delete_expired_tags_task ( blobs : BlobsClient , prefix : & str , ) -> anyhow:: Result < ( ) > {
100+ loop {
101+ delete_expired_tags ( & blobs, prefix) . await ?;
102+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
103+ }
104+ }
105+
106+ #[ tokio:: main]
107+ async fn main ( ) -> anyhow:: Result < ( ) > {
108+ tracing_subscriber:: fmt:: init ( ) ;
109+ let endpoint = endpoint:: Endpoint :: builder ( ) . bind ( ) . await ?;
110+ let store = iroh_blobs:: store:: fs:: Store :: load ( "blobs" ) . await ?;
111+ let blobs = iroh_blobs:: net_protocol:: Blobs :: builder ( store)
112+ . build ( & endpoint) ;
113+ // enable gc with a short period
114+ blobs. start_gc ( GcConfig {
115+ period : Duration :: from_secs ( 1 ) ,
116+ done_callback : None ,
117+ } ) ?;
118+ // create a router and add blobs as a service
119+ //
120+ // You can skip this if you don't want to serve the data over the network.
121+ let router = iroh:: protocol:: Router :: builder ( endpoint)
122+ . accept ( iroh_blobs:: ALPN , blobs. clone ( ) )
123+ . spawn ( ) . await ?;
124+
125+ // setup: add some data and tag it
126+ {
127+ // add several blobs and tag them with an expiry date 10 seconds in the future
128+ let batch = blobs. client ( ) . batch ( ) . await ?;
129+ let a = batch. add_bytes ( "blob 1" . as_bytes ( ) ) . await ?;
130+ let b = batch. add_bytes ( "blob 2" . as_bytes ( ) ) . await ?;
131+ let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 10 ) ) . unwrap ( ) ;
132+ create_expiring_tag ( blobs. client ( ) , & [ * a. hash ( ) , * b. hash ( ) ] , "expiring" , expires_at) . await ?;
133+
134+ // add a single blob and tag it with an expiry date 60 seconds in the future
135+ let c = batch. add_bytes ( "blob 3" . as_bytes ( ) ) . await ?;
136+ let expires_at = SystemTime :: now ( ) . checked_add ( Duration :: from_secs ( 60 ) ) . unwrap ( ) ;
137+ create_expiring_tag ( blobs. client ( ) , & [ * c. hash ( ) ] , "expiring" , expires_at) . await ?;
138+ // batch goes out of scope, so data is only protected by the tags we created
139+ }
140+ let client = blobs. client ( ) . clone ( ) ;
141+
142+ // delete expired tags every 5 seconds
143+ let check_task = tokio:: spawn ( delete_expired_tags_task ( client. clone ( ) , "expiring" ) ) ;
144+ // print tags every 5 seconds
145+ let print_tags_task = tokio:: spawn ( print_tags_task ( client. clone ( ) ) ) ;
146+ // print blobs every 5 seconds
147+ let print_blobs_task = tokio:: spawn ( print_blobs_task ( client) ) ;
148+
149+ ctrl_c ( ) . await ?;
150+ router. shutdown ( ) . await ?;
151+ check_task. abort ( ) ;
152+ print_tags_task. abort ( ) ;
153+ print_blobs_task. abort ( ) ;
154+ Ok ( ( ) )
155+ }
0 commit comments