Skip to content

Commit cae94ab

Browse files
NicolappsConvex, Inc.
authored andcommitted
Convert fivetran_sdk::Selection to fivetran_source::api_types::Selection (#40598)
This is necessary in order to support partial selection in the Fivetran connector. We already have an internal `Selection` struct used to support partial selection, but we need to convert to it from the data structure Fivetran gives us. GitOrigin-RevId: 6dcf06c146e47dd9dadd825e0721884e3e798ed6
1 parent 00bd927 commit cae94ab

File tree

4 files changed

+334
-28
lines changed

4 files changed

+334
-28
lines changed

crates/fivetran_source/src/api_types/selection.rs

Lines changed: 316 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
use std::collections::BTreeMap;
22

3+
use convex_fivetran_common::fivetran_sdk::{
4+
selection::Selection as FivetranSelection,
5+
SchemaSelection as FivetranSchemaSelection,
6+
TableSelection as FivetranTableSelection,
7+
TablesWithSchema as FivetranSelectionWithSchema,
8+
};
39
use maplit::btreemap;
410
use serde::{
511
Deserialize,
@@ -13,16 +19,20 @@ use super::SelectionArg;
1319
/// This is the serializable version of `StreamingExportSelection` in the
1420
/// database crate.
1521
#[derive(Serialize, Deserialize)]
16-
#[cfg_attr(test, derive(Eq, PartialEq, Debug, Clone))]
22+
#[cfg_attr(test, derive(Eq, PartialEq, Debug, Clone, proptest_derive::Arbitrary))]
1723
pub struct Selection {
1824
#[serde(flatten)]
19-
pub components: BTreeMap<String, ComponentSelection>,
25+
pub components: BTreeMap<
26+
String, // The component name ("" for the default component)
27+
ComponentSelection,
28+
>,
2029
#[serde(rename = "_other")]
2130
pub other_components: InclusionDefault,
2231
}
2332

2433
/// Serializable version of `StreamingExportInclusionDefault`
2534
#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
35+
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
2636
pub enum InclusionDefault {
2737
#[serde(rename = "excl")]
2838
Excluded,
@@ -42,7 +52,7 @@ impl Default for Selection {
4252

4353
/// Serializable version of `StreamingExportComponentSelection`.
4454
#[derive(Serialize, Deserialize)]
45-
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug))]
55+
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
4656
pub enum ComponentSelection {
4757
#[serde(rename = "excl")]
4858
Excluded,
@@ -58,7 +68,7 @@ pub enum ComponentSelection {
5868
/// Serializable version of
5969
/// `StreamingExportTableSelection` + `StreamingExportColumnSelection`
6070
#[derive(Serialize, Deserialize)]
61-
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug))]
71+
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
6272
pub enum TableSelection {
6373
#[serde(rename = "excl")]
6474
Excluded,
@@ -82,6 +92,7 @@ impl TableSelection {
8292

8393
/// Serializable version of `StreamingExportColumnInclusion`.
8494
#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
95+
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
8596
pub enum ColumnInclusion {
8697
#[serde(rename = "excl")]
8798
Excluded,
@@ -124,8 +135,104 @@ impl From<SelectionArg> for Selection {
124135
}
125136
}
126137

138+
/// The name of the Fivetran schema that we use for the Convex tables in the
139+
/// root component (i.e. the “database name” users see for the Convex root
140+
/// component)
141+
pub const DEFAULT_FIVETRAN_SCHEMA_NAME: &str = "convex";
142+
143+
impl TryFrom<Option<FivetranSelection>> for Selection {
144+
type Error = anyhow::Error;
145+
146+
fn try_from(value: Option<FivetranSelection>) -> Result<Self, Self::Error> {
147+
match value {
148+
None => Ok(Selection::default()),
149+
Some(FivetranSelection::WithSchema(with_schema)) => Ok(with_schema.into()),
150+
Some(FivetranSelection::WithoutSchema(_)) => {
151+
anyhow::bail!("Fivetran unexpectedly sent a selection setting without a schema.")
152+
},
153+
}
154+
}
155+
}
156+
157+
impl From<FivetranSelectionWithSchema> for Selection {
158+
fn from(value: FivetranSelectionWithSchema) -> Self {
159+
Self {
160+
components: value
161+
.schemas
162+
.into_iter()
163+
.map(|schema| {
164+
(
165+
if schema.schema_name == DEFAULT_FIVETRAN_SCHEMA_NAME {
166+
String::from("")
167+
} else {
168+
schema.schema_name.clone()
169+
},
170+
schema.into(),
171+
)
172+
})
173+
.collect(),
174+
other_components: if value.include_new_schemas {
175+
InclusionDefault::Included
176+
} else {
177+
InclusionDefault::Excluded
178+
},
179+
}
180+
}
181+
}
182+
183+
impl From<FivetranSchemaSelection> for ComponentSelection {
184+
fn from(value: FivetranSchemaSelection) -> Self {
185+
if !value.included {
186+
Self::Excluded
187+
} else {
188+
Self::Included {
189+
tables: value
190+
.tables
191+
.into_iter()
192+
.map(|table| (table.table_name.clone(), table.into()))
193+
.collect(),
194+
other_tables: if value.include_new_tables {
195+
InclusionDefault::Included
196+
} else {
197+
InclusionDefault::Excluded
198+
},
199+
}
200+
}
201+
}
202+
}
203+
204+
impl From<FivetranTableSelection> for TableSelection {
205+
fn from(value: FivetranTableSelection) -> Self {
206+
if !value.included {
207+
Self::Excluded
208+
} else {
209+
Self::Included {
210+
columns: value
211+
.columns
212+
.into_iter()
213+
.map(|(name, included)| {
214+
(
215+
name,
216+
if included {
217+
ColumnInclusion::Included
218+
} else {
219+
ColumnInclusion::Excluded
220+
},
221+
)
222+
})
223+
.collect(),
224+
other_columns: if value.include_new_columns {
225+
InclusionDefault::Included
226+
} else {
227+
InclusionDefault::Excluded
228+
},
229+
}
230+
}
231+
}
232+
}
233+
127234
#[cfg(test)]
128-
mod tests {
235+
mod tests_selection_serde {
129236
use maplit::btreemap;
130237
use serde::{
131238
de::DeserializeOwned,
@@ -232,3 +339,207 @@ mod tests {
232339
);
233340
}
234341
}
342+
343+
#[cfg(test)]
344+
impl From<Selection> for FivetranSelectionWithSchema {
345+
fn from(value: Selection) -> Self {
346+
Self {
347+
schemas: value
348+
.components
349+
.into_iter()
350+
.map(|(component_name, component_selection)| {
351+
let schema_name = if component_name.is_empty() {
352+
DEFAULT_FIVETRAN_SCHEMA_NAME.to_string()
353+
} else {
354+
component_name
355+
};
356+
FivetranSchemaSelection {
357+
schema_name,
358+
included: match component_selection {
359+
ComponentSelection::Excluded => false,
360+
ComponentSelection::Included { .. } => true,
361+
},
362+
tables: match component_selection {
363+
ComponentSelection::Excluded => vec![],
364+
ComponentSelection::Included {
365+
ref tables,
366+
other_tables: _,
367+
} => tables
368+
.iter()
369+
.map(|(table_name, table_selection)| FivetranTableSelection {
370+
table_name: table_name.clone(),
371+
included: match table_selection {
372+
TableSelection::Excluded => false,
373+
TableSelection::Included { .. } => true,
374+
},
375+
columns: match &table_selection {
376+
TableSelection::Excluded => BTreeMap::new(),
377+
TableSelection::Included { columns, .. } => columns
378+
.iter()
379+
.map(|(name, inclusion)| {
380+
(
381+
name.clone(),
382+
matches!(inclusion, ColumnInclusion::Included),
383+
)
384+
})
385+
.collect(),
386+
},
387+
include_new_columns: match table_selection {
388+
TableSelection::Excluded => false,
389+
TableSelection::Included { other_columns, .. } => {
390+
matches!(other_columns, InclusionDefault::Included)
391+
},
392+
},
393+
})
394+
.collect(),
395+
},
396+
include_new_tables: match &component_selection {
397+
ComponentSelection::Excluded => false,
398+
ComponentSelection::Included { other_tables, .. } => {
399+
matches!(other_tables, InclusionDefault::Included)
400+
},
401+
},
402+
}
403+
})
404+
.collect(),
405+
include_new_schemas: matches!(value.other_components, InclusionDefault::Included),
406+
}
407+
}
408+
}
409+
410+
#[cfg(test)]
411+
mod tests_selection_fivetran_conversion {
412+
use convex_fivetran_common::fivetran_sdk::TablesWithNoSchema as FivetranSelectionWithNoSchema;
413+
use maplit::btreemap;
414+
415+
use super::*;
416+
417+
#[test]
418+
fn test_schema_equals_none_converts_to_everything_included() {
419+
let result: Result<Selection, _> = Selection::try_from(None);
420+
assert_eq!(
421+
result.unwrap(),
422+
Selection {
423+
components: BTreeMap::new(),
424+
other_components: InclusionDefault::Included,
425+
}
426+
);
427+
}
428+
429+
#[test]
430+
fn test_can_convert_from_fivetran_selection_with_schema() {
431+
let fivetran_selection = FivetranSelection::WithSchema(FivetranSelectionWithSchema {
432+
schemas: vec![FivetranSchemaSelection {
433+
schema_name: "convex".to_string(),
434+
included: true,
435+
tables: vec![FivetranTableSelection {
436+
table_name: "users".to_string(),
437+
included: true,
438+
columns: btreemap! {
439+
"name".to_string() => true,
440+
"email".to_string() => false,
441+
},
442+
include_new_columns: false,
443+
}],
444+
include_new_tables: true,
445+
}],
446+
include_new_schemas: false,
447+
});
448+
449+
let result: Result<Selection, _> = Selection::try_from(Some(fivetran_selection));
450+
assert!(result.is_ok());
451+
452+
let expected = Selection {
453+
components: btreemap! {
454+
"".to_string() => ComponentSelection::Included {
455+
tables: btreemap! {
456+
"users".to_string() => TableSelection::Included {
457+
columns: btreemap! {
458+
"name".to_string() => ColumnInclusion::Included,
459+
"email".to_string() => ColumnInclusion::Excluded,
460+
},
461+
other_columns: InclusionDefault::Excluded,
462+
},
463+
},
464+
other_tables: InclusionDefault::Included,
465+
},
466+
},
467+
other_components: InclusionDefault::Excluded,
468+
};
469+
470+
assert_eq!(result.unwrap(), expected);
471+
}
472+
473+
#[test]
474+
fn test_without_schema_variant_returns_error() {
475+
let without_schema_selection = FivetranSelectionWithNoSchema {
476+
tables: vec![FivetranTableSelection {
477+
included: true,
478+
table_name: "table1".to_string(),
479+
columns: BTreeMap::new(),
480+
include_new_columns: false,
481+
}],
482+
include_new_tables: false,
483+
};
484+
let fivetran_selection = FivetranSelection::WithoutSchema(without_schema_selection);
485+
486+
let result: Result<Selection, _> = Selection::try_from(Some(fivetran_selection));
487+
assert!(result.is_err());
488+
assert!(result
489+
.unwrap_err()
490+
.to_string()
491+
.contains("Fivetran unexpectedly sent a selection setting without a schema"));
492+
}
493+
494+
#[test]
495+
fn test_convex_component_in_fivetran_maps_to_empty_string() {
496+
// Fivetran doesn’t support empty schema names, so the root component ("" in
497+
// Convex) is called "convex" in Fivetran
498+
499+
let fivetran_selection = FivetranSelectionWithSchema {
500+
schemas: vec![FivetranSchemaSelection {
501+
schema_name: "convex".to_string(),
502+
included: true,
503+
tables: vec![],
504+
include_new_tables: true,
505+
}],
506+
include_new_schemas: false,
507+
};
508+
509+
let selection: Selection = fivetran_selection.into();
510+
511+
assert_eq!(
512+
selection,
513+
Selection {
514+
components: btreemap! {
515+
"".to_string() => ComponentSelection::Included {
516+
tables: BTreeMap::new(),
517+
other_tables: InclusionDefault::Included,
518+
},
519+
},
520+
other_components: InclusionDefault::Excluded,
521+
}
522+
);
523+
}
524+
525+
#[cfg(test)]
526+
mod tests_selection_roundtrip {
527+
use cmd_util::env::env_config;
528+
use proptest::prelude::*;
529+
530+
use super::*;
531+
532+
proptest! {
533+
#![proptest_config(ProptestConfig {
534+
cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1),
535+
failure_persistence: None, ..ProptestConfig::default()
536+
})]
537+
#[test]
538+
fn test_selection_to_fivetran_roundtrips(selection in any::<Selection>()) {
539+
let fivetran_selection: FivetranSelectionWithSchema = selection.clone().into();
540+
let roundtripped_selection: Selection = fivetran_selection.into();
541+
prop_assert_eq!(selection, roundtripped_selection);
542+
}
543+
}
544+
}
545+
}

crates/fivetran_source/src/connector.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,14 @@ use tonic::{
3131
};
3232

3333
use crate::{
34+
api_types::selection::DEFAULT_FIVETRAN_SCHEMA_NAME,
3435
convex_api::{
3536
ComponentPath,
3637
ConvexApi,
3738
Source,
3839
},
3940
log::log,
40-
schema::{
41-
generate_fivetran_schema,
42-
DEFAULT_FIVETRAN_SCHEMA_NAME,
43-
},
41+
schema::generate_fivetran_schema,
4442
sync::{
4543
sync,
4644
State,

0 commit comments

Comments
 (0)