11use crate :: error:: Result ;
22use crate :: storage:: { FileRange , compression:: CompressionAlgorithm } ;
33use anyhow:: { Context as _, bail} ;
4- use rusqlite:: { Connection , OpenFlags , OptionalExtension } ;
4+ use itertools:: Itertools as _;
5+ use sqlx:: { Acquire as _, QueryBuilder , Row as _, Sqlite } ;
56use std:: { fs, io, path:: Path } ;
67use tracing:: instrument;
78
@@ -20,97 +21,156 @@ impl FileInfo {
2021 }
2122}
2223
24+ /// crates a new empty SQLite database, and returns a configured connection
25+ /// pool to connect to the DB.
26+ /// Any existing DB at the given path will be deleted first.
27+ async fn sqlite_create < P : AsRef < Path > > ( path : P ) -> Result < sqlx:: SqlitePool > {
28+ let path = path. as_ref ( ) ;
29+ if path. exists ( ) {
30+ fs:: remove_file ( path) ?;
31+ }
32+
33+ sqlx:: SqlitePool :: connect_with (
34+ sqlx:: sqlite:: SqliteConnectOptions :: new ( )
35+ . filename ( path)
36+ . read_only ( false )
37+ . pragma ( "synchronous" , "full" )
38+ . create_if_missing ( true ) ,
39+ )
40+ . await
41+ . map_err ( Into :: into)
42+ }
43+
44+ /// open existing SQLite database, return a configured connection poll
45+ /// to connect to the DB.
46+ /// Will error when the database doesn't exist at that path.
47+ async fn sqlite_open < P : AsRef < Path > > ( path : P ) -> Result < sqlx:: SqlitePool > {
48+ sqlx:: SqlitePool :: connect_with (
49+ sqlx:: sqlite:: SqliteConnectOptions :: new ( )
50+ . filename ( path)
51+ . read_only ( true )
52+ . pragma ( "synchronous" , "off" ) // not needed for readonly db
53+ . serialized ( false ) // same as OPEN_NOMUTEX
54+ . create_if_missing ( false ) ,
55+ )
56+ . await
57+ . map_err ( Into :: into)
58+ }
59+
2360/// create an archive index based on a zipfile.
2461///
2562/// Will delete the destination file if it already exists.
2663#[ instrument( skip( zipfile) ) ]
27- pub ( crate ) fn create < R : io:: Read + io:: Seek , P : AsRef < Path > + std:: fmt:: Debug > (
64+ pub ( crate ) async fn create < R : io:: Read + io:: Seek , P : AsRef < Path > + std:: fmt:: Debug > (
2865 zipfile : & mut R ,
2966 destination : P ,
3067) -> Result < ( ) > {
31- let destination = destination. as_ref ( ) ;
32- if destination. exists ( ) {
33- fs:: remove_file ( destination) ?;
34- }
68+ let pool = sqlite_create ( destination) . await ?;
69+ let mut conn = pool. acquire ( ) . await ?;
70+ let mut tx = conn. begin ( ) . await ?;
3571
36- let conn = rusqlite:: Connection :: open ( destination) ?;
37- conn. execute ( "PRAGMA synchronous = FULL" , ( ) ) ?;
38- conn. execute ( "BEGIN" , ( ) ) ?;
39- conn. execute (
40- "
72+ sqlx:: query (
73+ r#"
4174 CREATE TABLE files (
4275 id INTEGER PRIMARY KEY,
4376 path TEXT UNIQUE,
4477 start INTEGER,
4578 end INTEGER,
4679 compression INTEGER
4780 );
48- " ,
49- ( ) ,
50- ) ?;
81+ "# ,
82+ )
83+ . execute ( & mut * tx)
84+ . await ?;
5185
5286 let mut archive = zip:: ZipArchive :: new ( zipfile) ?;
5387 let compression_bzip = CompressionAlgorithm :: Bzip2 as i32 ;
5488
55- for i in 0 ..archive. len ( ) {
56- let zf = archive. by_index ( i) ?;
57-
58- conn. execute (
59- "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)" ,
60- (
61- zf. name ( ) ,
62- zf. data_start ( ) ,
63- zf. data_start ( ) + zf. compressed_size ( ) - 1 ,
64- match zf. compression ( ) {
65- zip:: CompressionMethod :: Bzip2 => compression_bzip,
66- c => bail ! ( "unsupported compression algorithm {} in zip-file" , c) ,
67- } ,
68- ) ,
69- ) ?;
89+ const CHUNKS : usize = 1000 ;
90+ for chunk in & ( 0 ..archive. len ( ) ) . chunks ( CHUNKS ) {
91+ for i in chunk {
92+ let mut insert_stmt =
93+ QueryBuilder :: < Sqlite > :: new ( "INSERT INTO files (path, start, end, compression) " ) ;
94+
95+ let entry = archive. by_index ( i) ?;
96+
97+ let start = entry. data_start ( ) as i64 ;
98+ let end = ( entry. data_start ( ) + entry. compressed_size ( ) - 1 ) as i64 ;
99+ let compression_raw = match entry. compression ( ) {
100+ zip:: CompressionMethod :: Bzip2 => compression_bzip,
101+ c => bail ! ( "unsupported compression algorithm {} in zip-file" , c) ,
102+ } ;
103+
104+ insert_stmt. push_values ( [ ( ) ] , |mut b, _| {
105+ b. push_bind ( entry. name ( ) )
106+ . push_bind ( start)
107+ . push_bind ( end)
108+ . push_bind ( compression_raw) ;
109+ } ) ;
110+ insert_stmt
111+ . build ( )
112+ . persistent ( false )
113+ . execute ( & mut * tx)
114+ . await ?;
115+ }
70116 }
71- conn. execute ( "CREATE INDEX idx_files_path ON files (path);" , ( ) ) ?;
72- conn. execute ( "END" , ( ) ) ?;
73- conn. execute ( "VACUUM" , ( ) ) ?;
117+
118+ sqlx:: query ( "CREATE INDEX idx_files_path ON files (path);" )
119+ . execute ( & mut * tx)
120+ . await ?;
121+
122+ // Commit the transaction before VACUUM (VACUUM cannot run inside a transaction)
123+ tx. commit ( ) . await ?;
124+
125+ // VACUUM outside the transaction
126+ sqlx:: query ( "VACUUM" ) . execute ( & mut * conn) . await ?;
127+
74128 Ok ( ( ) )
75129}
76130
77- fn find_in_sqlite_index ( conn : & Connection , search_for : & str ) -> Result < Option < FileInfo > > {
78- let mut stmt = conn. prepare (
131+ async fn find_in_sqlite_index < ' e , E > ( executor : E , search_for : & str ) -> Result < Option < FileInfo > >
132+ where
133+ E : sqlx:: Executor < ' e , Database = sqlx:: Sqlite > ,
134+ {
135+ let row = sqlx:: query (
79136 "
80137 SELECT start, end, compression
81138 FROM files
82139 WHERE path = ?
83140 " ,
84- ) ?;
85-
86- stmt. query_row ( ( search_for, ) , |row| {
87- let compression: i32 = row. get ( 2 ) ?;
88-
89- Ok ( FileInfo {
90- range : row. get ( 0 ) ?..=row. get ( 1 ) ?,
91- compression : compression. try_into ( ) . map_err ( |value| {
92- rusqlite:: Error :: FromSqlConversionFailure (
93- 2 ,
94- rusqlite:: types:: Type :: Integer ,
95- format ! ( "invalid compression algorithm '{value}' in database" ) . into ( ) ,
96- )
141+ )
142+ . bind ( search_for)
143+ . fetch_optional ( executor)
144+ . await
145+ . context ( "error fetching SQLite data" ) ?;
146+
147+ if let Some ( row) = row {
148+ let start: u64 = row. try_get ( 0 ) ?;
149+ let end: u64 = row. try_get ( 1 ) ?;
150+ let compression_raw: i32 = row. try_get ( 2 ) ?;
151+
152+ Ok ( Some ( FileInfo {
153+ range : start..=end,
154+ compression : compression_raw. try_into ( ) . map_err ( |value| {
155+ anyhow:: anyhow!( format!(
156+ "invalid compression algorithm '{value}' in database"
157+ ) )
97158 } ) ?,
98- } )
99- } )
100- . optional ( )
101- . context ( "error fetching SQLite data" )
159+ } ) )
160+ } else {
161+ Ok ( None )
162+ }
102163}
103164
104165#[ instrument]
105- pub ( crate ) fn find_in_file < P : AsRef < Path > + std:: fmt:: Debug > (
166+ pub ( crate ) async fn find_in_file < P : AsRef < Path > + std:: fmt:: Debug > (
106167 archive_index_path : P ,
107168 search_for : & str ,
108169) -> Result < Option < FileInfo > > {
109- let connection = Connection :: open_with_flags (
110- archive_index_path,
111- OpenFlags :: SQLITE_OPEN_READ_ONLY | OpenFlags :: SQLITE_OPEN_NO_MUTEX ,
112- ) ?;
113- find_in_sqlite_index ( & connection, search_for)
170+ let pool = sqlite_open ( archive_index_path) . await ?;
171+ let mut conn = pool. acquire ( ) . await ?;
172+
173+ find_in_sqlite_index ( & mut * conn, search_for) . await
114174}
115175
116176#[ cfg( test) ]
@@ -138,43 +198,38 @@ mod tests {
138198 tf
139199 }
140200
141- #[ test]
142- fn index_create_save_load_sqlite ( ) {
201+ #[ tokio :: test]
202+ async fn index_create_save_load_sqlite ( ) -> Result < ( ) > {
143203 let mut tf = create_test_archive ( 1 ) ;
144204
145205 let tempfile = tempfile:: NamedTempFile :: new ( ) . unwrap ( ) . into_temp_path ( ) ;
146- create ( & mut tf, & tempfile) . unwrap ( ) ;
206+ create ( & mut tf, & tempfile) . await ? ;
147207
148- let fi = find_in_file ( & tempfile, "testfile0" ) . unwrap ( ) . unwrap ( ) ;
208+ let fi = find_in_file ( & tempfile, "testfile0" ) . await ? . unwrap ( ) ;
149209
150210 assert_eq ! ( fi. range, FileRange :: new( 39 , 459 ) ) ;
151211 assert_eq ! ( fi. compression, CompressionAlgorithm :: Bzip2 ) ;
152212
153- assert ! (
154- find_in_file( & tempfile, "some_other_file" , )
155- . unwrap( )
156- . is_none( )
157- ) ;
213+ assert ! ( find_in_file( & tempfile, "some_other_file" , ) . await ?. is_none( ) ) ;
214+ Ok ( ( ) )
158215 }
159216
160- #[ test]
161- fn archive_with_more_than_65k_files ( ) {
217+ #[ tokio :: test]
218+ async fn archive_with_more_than_65k_files ( ) -> Result < ( ) > {
162219 let mut tf = create_test_archive ( 100_000 ) ;
163220
164- let tempfile = tempfile:: NamedTempFile :: new ( ) . unwrap ( ) . into_temp_path ( ) ;
165- create ( & mut tf, & tempfile) . unwrap ( ) ;
166-
167- let connection = Connection :: open_with_flags (
168- tempfile,
169- OpenFlags :: SQLITE_OPEN_READ_ONLY | OpenFlags :: SQLITE_OPEN_NO_MUTEX ,
170- )
171- . unwrap ( ) ;
172- let mut stmt = connection. prepare ( "SELECT count(*) FROM files" ) . unwrap ( ) ;
173-
174- let count = stmt
175- . query_row ( [ ] , |row| Ok ( row. get :: < _ , usize > ( 0 ) ) )
176- . unwrap ( )
177- . unwrap ( ) ;
178- assert_eq ! ( count, 100_000 ) ;
221+ let tempfile = tempfile:: NamedTempFile :: new ( ) ?. into_temp_path ( ) ;
222+ create ( & mut tf, & tempfile) . await ?;
223+
224+ let pool = sqlite_open ( & tempfile) . await ?;
225+ let mut conn = pool. acquire ( ) . await ?;
226+
227+ let row = sqlx:: query ( "SELECT count(*) FROM files" )
228+ . fetch_one ( & mut * conn)
229+ . await ?;
230+
231+ assert_eq ! ( row. get:: <i64 , _>( 0 ) , 100_000 ) ;
232+
233+ Ok ( ( ) )
179234 }
180235}
0 commit comments