@@ -10,13 +10,14 @@ use n0_future::task::{self, AbortOnDropHandle};
1010
1111use crate :: engine:: Engine ;
1212
13- use crate :: rpc2:: protocol:: DocsProtocol ;
14-
15- use super :: { protocol, RpcActor } ;
13+ use super :: {
14+ actor:: RpcActor ,
15+ protocol:: { DocsMessage , DocsProtocol , DocsService } ,
16+ } ;
1617
1718pub use self :: client:: * ;
1819
19- type Client = irpc:: Client < protocol :: DocsMessage , protocol :: DocsProtocol , protocol :: DocsService > ;
20+ type Client = irpc:: Client < DocsMessage , DocsProtocol , DocsService > ;
2021
2122/// API wrapper for the docs service
2223#[ derive( Debug , Clone ) ]
@@ -79,21 +80,29 @@ impl DocsApi {
7980}
8081
8182mod client {
82- use std:: sync:: {
83- atomic:: { AtomicBool , Ordering } ,
84- Arc ,
83+ use std:: {
84+ future:: Future ,
85+ path:: Path ,
86+ pin:: Pin ,
87+ sync:: {
88+ atomic:: { AtomicBool , Ordering } ,
89+ Arc ,
90+ } ,
91+ task:: { ready, Poll } ,
8592 } ;
8693
8794 use anyhow:: Result ;
8895 use bytes:: Bytes ;
8996 use iroh:: NodeAddr ;
90- use iroh_blobs:: Hash ;
91- use n0_future:: { Stream , StreamExt } ;
97+ use iroh_blobs:: {
98+ api:: blobs:: { AddPathOptions , AddProgressItem , ExportMode , ExportOptions , ExportProgress } ,
99+ Hash ,
100+ } ;
101+ use n0_future:: { FutureExt , Stream , StreamExt } ;
92102
93103 use crate :: {
94104 actor:: OpenState ,
95- engine:: LiveEvent ,
96- rpc2:: protocol:: {
105+ api:: protocol:: {
97106 AddrInfoOptions , AuthorCreateRequest , AuthorDeleteRequest , AuthorExportRequest ,
98107 AuthorGetDefaultRequest , AuthorImportRequest , AuthorListRequest ,
99108 AuthorSetDefaultRequest , CloseRequest , CreateRequest , DelRequest , DropRequest ,
@@ -102,6 +111,7 @@ mod client {
102111 SetHashRequest , SetRequest , ShareMode , ShareRequest , StartSyncRequest , StatusRequest ,
103112 SubscribeRequest ,
104113 } ,
114+ engine:: LiveEvent ,
105115 store:: { DownloadPolicy , Query } ,
106116 Author , AuthorId , Capability , CapabilityKind , DocTicket , Entry , NamespaceId , PeerIdBytes ,
107117 } ;
@@ -317,7 +327,7 @@ mod client {
317327 Ok ( response. entry . content_hash ( ) )
318328 }
319329
320- /// Sets an entries on the doc via its key, hash, and size.
330+ /// Sets an entry on the doc via its key, hash, and size.
321331 pub async fn set_hash (
322332 & self ,
323333 author_id : AuthorId ,
@@ -519,5 +529,177 @@ mod client {
519529 . await ??;
520530 Ok ( response. peers )
521531 }
532+
533+ /// Adds an entry from an absolute file path
534+ pub async fn import_file (
535+ & self ,
536+ blobs : & iroh_blobs:: api:: Store ,
537+ author : AuthorId ,
538+ key : Bytes ,
539+ path : impl AsRef < Path > ,
540+ import_mode : iroh_blobs:: api:: blobs:: ImportMode ,
541+ ) -> Result < ImportFileProgress > {
542+ self . ensure_open ( ) ?;
543+ let progress = blobs. add_path_with_opts ( AddPathOptions {
544+ path : path. as_ref ( ) . to_owned ( ) ,
545+ format : iroh_blobs:: BlobFormat :: Raw ,
546+ mode : import_mode,
547+ } ) ;
548+ let stream = progress. stream ( ) . await ;
549+ let doc = self . clone ( ) ;
550+ let ctx = EntryContext {
551+ doc,
552+ author,
553+ key,
554+ size : None ,
555+ } ;
556+ Ok ( ImportFileProgress ( ImportInner :: Blobs (
557+ Box :: pin ( stream) ,
558+ Some ( ctx) ,
559+ ) ) )
560+ }
561+
562+ /// Exports an entry as a file to a given absolute path.
563+ pub async fn export_file (
564+ & self ,
565+ blobs : & iroh_blobs:: api:: Store ,
566+ entry : Entry ,
567+ path : impl AsRef < Path > ,
568+ mode : ExportMode ,
569+ ) -> Result < ExportProgress > {
570+ self . ensure_open ( ) ?;
571+ let hash = entry. content_hash ( ) ;
572+ let progress = blobs. export_with_opts ( ExportOptions {
573+ hash,
574+ mode,
575+ target : path. as_ref ( ) . to_path_buf ( ) ,
576+ } ) ;
577+ Ok ( progress)
578+ }
579+ }
580+
581+ ///
582+ #[ derive( Debug ) ]
583+ pub enum ImportFileProgressItem {
584+ Error ( anyhow:: Error ) ,
585+ Blobs ( AddProgressItem ) ,
586+ Done ( ImportFileOutcome ) ,
587+ }
588+
589+ ///
590+ #[ derive( Debug ) ]
591+ pub struct ImportFileProgress ( ImportInner ) ;
592+
593+ #[ derive( derive_more:: Debug ) ]
594+ enum ImportInner {
595+ #[ debug( "Blobs" ) ]
596+ Blobs (
597+ n0_future:: boxed:: BoxStream < AddProgressItem > ,
598+ Option < EntryContext > ,
599+ ) ,
600+ #[ debug( "Entry" ) ]
601+ Entry ( n0_future:: boxed:: BoxFuture < Result < ImportFileOutcome > > ) ,
602+ Done ,
603+ }
604+
605+ struct EntryContext {
606+ doc : Doc ,
607+ author : AuthorId ,
608+ key : Bytes ,
609+ size : Option < u64 > ,
610+ }
611+
612+ impl Stream for ImportFileProgress {
613+ type Item = ImportFileProgressItem ;
614+
615+ fn poll_next (
616+ self : Pin < & mut Self > ,
617+ cx : & mut std:: task:: Context < ' _ > ,
618+ ) -> Poll < Option < Self :: Item > > {
619+ let this = self . get_mut ( ) ;
620+ match this. 0 {
621+ ImportInner :: Blobs ( ref mut progress, ref mut context) => {
622+ match ready ! ( progress. poll_next( cx) ) {
623+ Some ( item) => match item {
624+ AddProgressItem :: Size ( size) => {
625+ context
626+ . as_mut ( )
627+ . expect ( "Size must be emitted before done" )
628+ . size = Some ( size) ;
629+ Poll :: Ready ( Some ( ImportFileProgressItem :: Blobs (
630+ AddProgressItem :: Size ( size) ,
631+ ) ) )
632+ }
633+ AddProgressItem :: Error ( err) => {
634+ * this = Self ( ImportInner :: Done ) ;
635+ Poll :: Ready ( Some ( ImportFileProgressItem :: Error ( err. into ( ) ) ) )
636+ }
637+ AddProgressItem :: Done ( tag) => {
638+ let EntryContext {
639+ doc,
640+ author,
641+ key,
642+ size,
643+ } = context
644+ . take ( )
645+ . expect ( "AddProgressItem::Done may be emitted only once" ) ;
646+ let size = size. expect ( "Size must be emitted before done" ) ;
647+ let hash = * tag. hash ( ) ;
648+ * this = Self ( ImportInner :: Entry ( Box :: pin ( async move {
649+ doc. set_hash ( author, key. clone ( ) , hash, size) . await ?;
650+ Ok ( ImportFileOutcome { hash, size, key } )
651+ } ) ) ) ;
652+ Poll :: Ready ( Some ( ImportFileProgressItem :: Blobs (
653+ AddProgressItem :: Done ( tag) ,
654+ ) ) )
655+ }
656+ item => Poll :: Ready ( Some ( ImportFileProgressItem :: Blobs ( item) ) ) ,
657+ } ,
658+ None => todo ! ( ) ,
659+ }
660+ }
661+ ImportInner :: Entry ( ref mut fut) => {
662+ let res = ready ! ( fut. poll( cx) ) ;
663+ * this = Self ( ImportInner :: Done ) ;
664+ match res {
665+ Ok ( outcome) => Poll :: Ready ( Some ( ImportFileProgressItem :: Done ( outcome) ) ) ,
666+ Err ( err) => Poll :: Ready ( Some ( ImportFileProgressItem :: Error ( err) ) ) ,
667+ }
668+ }
669+ ImportInner :: Done => Poll :: Ready ( None ) ,
670+ }
671+ }
672+ }
673+
674+ impl Future for ImportFileProgress {
675+ type Output = Result < ImportFileOutcome > ;
676+ fn poll ( mut self : Pin < & mut Self > , cx : & mut std:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
677+ loop {
678+ match self . as_mut ( ) . poll_next ( cx) {
679+ Poll :: Ready ( Some ( item) ) => match item {
680+ ImportFileProgressItem :: Error ( error) => return Poll :: Ready ( Err ( error) ) ,
681+ ImportFileProgressItem :: Blobs ( _add_progress_item) => continue ,
682+ ImportFileProgressItem :: Done ( outcome) => return Poll :: Ready ( Ok ( outcome) ) ,
683+ } ,
684+ Poll :: Ready ( None ) => {
685+ return Poll :: Ready ( Err ( anyhow:: anyhow!(
686+ "ImportFileProgress polled after completion"
687+ ) ) )
688+ }
689+ Poll :: Pending => return Poll :: Pending ,
690+ }
691+ }
692+ }
693+ }
694+
695+ /// Outcome of a [`Doc::import_file`] operation
696+ #[ derive( Debug , Clone , PartialEq , Eq ) ]
697+ pub struct ImportFileOutcome {
698+ /// The hash of the entry's content
699+ pub hash : Hash ,
700+ /// The size of the entry
701+ pub size : u64 ,
702+ /// The key of the entry
703+ pub key : Bytes ,
522704 }
523705}
0 commit comments