@@ -28,11 +28,8 @@ mod update_location;
2828mod update_properties;
2929mod upgrade_format_version;
3030
31- use std:: mem:: discriminant;
3231use std:: sync:: Arc ;
3332
34- use uuid:: Uuid ;
35-
3633use crate :: error:: Result ;
3734use crate :: table:: Table ;
3835use crate :: transaction:: action:: BoxedTransactionAction ;
@@ -45,67 +42,49 @@ use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
4542
4643/// Table transaction.
4744pub struct Transaction {
48- base_table : Table ,
49- current_table : Table ,
45+ table : Table ,
5046 actions : Vec < BoxedTransactionAction > ,
51- updates : Vec < TableUpdate > ,
52- requirements : Vec < TableRequirement > ,
5347}
5448
5549impl Transaction {
5650 /// Creates a new transaction.
5751 pub fn new ( table : & Table ) -> Self {
5852 Self {
59- base_table : table. clone ( ) ,
60- current_table : table. clone ( ) ,
53+ table : table. clone ( ) ,
6154 actions : vec ! [ ] ,
62- updates : vec ! [ ] ,
63- requirements : vec ! [ ] ,
6455 }
6556 }
6657
67- fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
68- let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
58+ fn update_table_metadata ( table : Table , updates : & [ TableUpdate ] ) -> Result < Table > {
59+ let mut metadata_builder = table . metadata ( ) . clone ( ) . into_builder ( None ) ;
6960 for update in updates {
7061 metadata_builder = update. clone ( ) . apply ( metadata_builder) ?;
7162 }
7263
73- self . current_table
74- . with_metadata ( Arc :: new ( metadata_builder. build ( ) ?. metadata ) ) ;
75-
76- Ok ( ( ) )
64+ Ok ( table. with_metadata ( Arc :: new ( metadata_builder. build ( ) ?. metadata ) ) )
7765 }
7866
67+ /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
68+ /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
7969 fn apply (
80- & mut self ,
81- updates : Vec < TableUpdate > ,
82- requirements : Vec < TableRequirement > ,
83- ) -> Result < ( ) > {
70+ table : Table ,
71+ mut action_commit : ActionCommit ,
72+ existing_updates : & mut Vec < TableUpdate > ,
73+ existing_requirements : & mut Vec < TableRequirement > ,
74+ ) -> Result < Table > {
75+ let updates = action_commit. take_updates ( ) ;
76+ let requirements = action_commit. take_requirements ( ) ;
77+
8478 for requirement in & requirements {
85- requirement. check ( Some ( self . current_table . metadata ( ) ) ) ?;
79+ requirement. check ( Some ( table . metadata ( ) ) ) ?;
8680 }
8781
88- self . update_table_metadata ( & updates) ?;
89-
90- self . updates . extend ( updates) ;
91-
92- // For the requirements, it does not make sense to add a requirement more than once
93- // For example, you cannot assert that the current schema has two different IDs
94- for new_requirement in requirements {
95- if self
96- . requirements
97- . iter ( )
98- . map ( discriminant)
99- . all ( |d| d != discriminant ( & new_requirement) )
100- {
101- self . requirements . push ( new_requirement) ;
102- }
103- }
82+ let updated_table = Self :: update_table_metadata ( table, & updates) ?;
10483
105- // # TODO
106- // Support auto commit later.
84+ existing_updates . extend ( updates ) ;
85+ existing_requirements . extend ( requirements ) ;
10786
108- Ok ( ( ) )
87+ Ok ( updated_table )
10988 }
11089
11190 /// Sets table to a new version.
@@ -118,31 +97,9 @@ impl Transaction {
11897 UpdatePropertiesAction :: new ( )
11998 }
12099
121- fn generate_unique_snapshot_id ( & self ) -> i64 {
122- let generate_random_id = || -> i64 {
123- let ( lhs, rhs) = Uuid :: new_v4 ( ) . as_u64_pair ( ) ;
124- let snapshot_id = ( lhs ^ rhs) as i64 ;
125- if snapshot_id < 0 {
126- -snapshot_id
127- } else {
128- snapshot_id
129- }
130- } ;
131- let mut snapshot_id = generate_random_id ( ) ;
132- while self
133- . current_table
134- . metadata ( )
135- . snapshots ( )
136- . any ( |s| s. snapshot_id ( ) == snapshot_id)
137- {
138- snapshot_id = generate_random_id ( ) ;
139- }
140- snapshot_id
141- }
142-
143100 /// Creates a fast append action.
144101 pub fn fast_append ( & self ) -> FastAppendAction {
145- FastAppendAction :: new ( self . generate_unique_snapshot_id ( ) )
102+ FastAppendAction :: new ( )
146103 }
147104
148105 /// Creates replace sort order action.
@@ -157,41 +114,43 @@ impl Transaction {
157114
158115 /// Commit transaction.
159116 pub async fn commit ( mut self , catalog : & dyn Catalog ) -> Result < Table > {
160- if self . actions . is_empty ( ) && self . updates . is_empty ( ) {
117+ if self . actions . is_empty ( ) {
161118 // nothing to commit
162- return Ok ( self . base_table . clone ( ) ) ;
119+ return Ok ( self . table . clone ( ) ) ;
163120 }
164121
165122 self . do_commit ( catalog) . await
166123 }
167124
168125 async fn do_commit ( & mut self , catalog : & dyn Catalog ) -> Result < Table > {
169- let base_table_identifier = self . base_table . identifier ( ) . to_owned ( ) ;
126+ let refreshed = catalog . load_table ( self . table . identifier ( ) ) . await ? ;
170127
171- let refreshed = catalog. load_table ( & base_table_identifier. clone ( ) ) . await ?;
172-
173- if self . base_table . metadata ( ) != refreshed. metadata ( )
174- || self . base_table . metadata_location ( ) != refreshed. metadata_location ( )
128+ if self . table . metadata ( ) != refreshed. metadata ( )
129+ || self . table . metadata_location ( ) != refreshed. metadata_location ( )
175130 {
176131 // current base is stale, use refreshed as base and re-apply transaction actions
177- self . base_table = refreshed. clone ( ) ;
132+ self . table = refreshed. clone ( ) ;
178133 }
179134
180- let current_table = self . base_table . clone ( ) ;
181-
182- for action in self . actions . clone ( ) {
183- let mut action_commit = action. commit ( & current_table) . await ?;
184- // apply changes to current_table
185- self . apply (
186- action_commit. take_updates ( ) ,
187- action_commit. take_requirements ( ) ,
135+ let mut current_table = self . table . clone ( ) ;
136+ let mut existing_updates: Vec < TableUpdate > = vec ! [ ] ;
137+ let mut existing_requirements: Vec < TableRequirement > = vec ! [ ] ;
138+
139+ for action in & self . actions {
140+ let action_commit = Arc :: clone ( action) . commit ( & current_table) . await ?;
141+ // apply action commit to current_table
142+ current_table = Self :: apply (
143+ current_table,
144+ action_commit,
145+ & mut existing_updates,
146+ & mut existing_requirements,
188147 ) ?;
189148 }
190149
191150 let table_commit = TableCommit :: builder ( )
192- . ident ( base_table_identifier )
193- . updates ( self . updates . clone ( ) )
194- . requirements ( self . requirements . clone ( ) )
151+ . ident ( self . table . identifier ( ) . to_owned ( ) )
152+ . updates ( existing_updates )
153+ . requirements ( existing_requirements )
195154 . build ( ) ;
196155
197156 catalog. update_table ( table_commit) . await
0 commit comments