Skip to content

Commit f037f3e

Browse files
NicolappsConvex, Inc.
authored andcommitted
Add selection support to the Fivetran source connector (#37390)
This adds selection support to the Fivetran source component, meaning that users will be able to opt out of syncing some components, tables and columns. GitOrigin-RevId: acb1ef09e81ad3aa1467ad35814d208f61e2ba97
1 parent 4bfabd6 commit f037f3e

File tree

7 files changed

+551
-74
lines changed

7 files changed

+551
-74
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fivetran_source/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ tonic-build = { workspace = true }
5151
cmd_util = { workspace = true }
5252
convex = { workspace = true, features = ["testing"] }
5353
convex_fivetran_common = { workspace = true, features = ["testing"] }
54+
pretty_assertions = { workspace = true }
5455
proptest = { workspace = true }
5556
proptest-derive = { workspace = true }
5657
rand = { workspace = true }

crates/fivetran_source/src/api_types/selection.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::BTreeMap;
33
use convex_fivetran_common::fivetran_sdk::{
44
selection::Selection as FivetranSelection,
55
SchemaSelection as FivetranSchemaSelection,
6+
Selection as FivetranRootSelection,
67
TableSelection as FivetranTableSelection,
78
TablesWithSchema as FivetranSelectionWithSchema,
89
};
@@ -20,8 +21,8 @@ use super::SelectionArg;
2021
///
2122
/// This is the serializable version of `StreamingExportSelection` in the
2223
/// database crate.
23-
#[derive(Serialize, Deserialize)]
24-
#[cfg_attr(test, derive(Eq, PartialEq, Debug, Clone, proptest_derive::Arbitrary))]
24+
#[derive(Serialize, Deserialize, Clone)]
25+
#[cfg_attr(test, derive(Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
2526
pub struct Selection {
2627
#[serde(flatten)]
2728
#[cfg_attr(
@@ -58,8 +59,8 @@ impl Default for Selection {
5859
}
5960

6061
/// Serializable version of `StreamingExportComponentSelection`.
61-
#[derive(Serialize, Deserialize)]
62-
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
62+
#[derive(Serialize, Deserialize, Clone)]
63+
#[cfg_attr(test, derive(Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
6364
pub enum ComponentSelection {
6465
#[serde(rename = "excl")]
6566
Excluded,
@@ -79,8 +80,8 @@ pub enum ComponentSelection {
7980

8081
/// Serializable version of
8182
/// `StreamingExportTableSelection` + `StreamingExportColumnSelection`
82-
#[derive(Serialize, Deserialize)]
83-
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
83+
#[derive(Serialize, Deserialize, Clone)]
84+
#[cfg_attr(test, derive(Eq, PartialEq, Debug, proptest_derive::Arbitrary))]
8485
pub enum TableSelection {
8586
#[serde(rename = "excl")]
8687
Excluded,
@@ -157,6 +158,14 @@ impl From<SelectionArg> for Selection {
157158
/// component)
158159
pub const DEFAULT_FIVETRAN_SCHEMA_NAME: &str = "convex";
159160

161+
impl TryFrom<Option<FivetranRootSelection>> for Selection {
162+
type Error = anyhow::Error;
163+
164+
fn try_from(value: Option<FivetranRootSelection>) -> Result<Self, Self::Error> {
165+
Selection::try_from(value.and_then(|val| val.selection))
166+
}
167+
}
168+
160169
impl TryFrom<Option<FivetranSelection>> for Selection {
161170
type Error = anyhow::Error;
162171

@@ -432,8 +441,9 @@ mod tests_selection_fivetran_conversion {
432441
use super::*;
433442

434443
#[test]
435-
fn test_schema_equals_none_converts_to_everything_included() {
436-
let result: Result<Selection, _> = Selection::try_from(None);
444+
fn test_none_converts_to_everything_included() {
445+
let none: Option<FivetranSelection> = None;
446+
let result: Result<Selection, _> = Selection::try_from(none);
437447
assert_eq!(
438448
result.unwrap(),
439449
Selection {

crates/fivetran_source/src/connector.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use tonic::{
3131
};
3232

3333
use crate::{
34-
api_types::selection::DEFAULT_FIVETRAN_SCHEMA_NAME,
34+
api_types::selection::{
35+
Selection,
36+
DEFAULT_FIVETRAN_SCHEMA_NAME,
37+
},
3538
convex_api::{
3639
ComponentPath,
3740
ConvexApi,
@@ -158,7 +161,10 @@ impl SourceConnector for ConvexConnector {
158161

159162
let source = ConvexApi { config };
160163

161-
let sync = sync(source, state);
164+
let selection = Selection::try_from(inner.selection)
165+
.map_err(|error| Status::internal(error.to_string()))?;
166+
167+
let sync = sync(source, state, selection);
162168
Ok(Response::new(
163169
sync.map_ok(FivetranUpdateResponse::from)
164170
.map_err(|error| Status::internal(error.to_string()))

crates/fivetran_source/src/convex_api.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use serde::{
2323
};
2424

2525
use crate::api_types::{
26-
selection::DEFAULT_FIVETRAN_SCHEMA_NAME,
26+
selection::{
27+
Selection,
28+
DEFAULT_FIVETRAN_SCHEMA_NAME,
29+
},
2730
DocumentDeltasArgs,
2831
DocumentDeltasResponse,
2932
DocumentDeltasValue,
@@ -53,12 +56,14 @@ pub trait Source: Display + Send {
5356
&self,
5457
snapshot: Option<i64>,
5558
cursor: Option<ListSnapshotCursor>,
59+
selection: Selection,
5660
) -> anyhow::Result<ListSnapshotResponse>;
5761

5862
/// See https://docs.convex.dev/http-api/#get-apidocument_deltas
5963
async fn document_deltas(
6064
&self,
6165
cursor: DocumentDeltasCursor,
66+
selection: Selection,
6267
) -> anyhow::Result<DocumentDeltasResponse>;
6368

6469
/// Get a list of columns for each table and component on the Convex
@@ -173,13 +178,14 @@ impl Source for ConvexApi {
173178
&self,
174179
snapshot: Option<i64>,
175180
cursor: Option<ListSnapshotCursor>,
181+
selection: Selection,
176182
) -> anyhow::Result<ListSnapshotResponse> {
177183
self.post(
178184
"list_snapshot",
179185
ListSnapshotArgs {
180186
snapshot,
181187
cursor: cursor.map(|c| c.into()),
182-
selection: SelectionArg::default(),
188+
selection: SelectionArg::Exact { selection },
183189
format: Some("convex_encoded_json".to_string()),
184190
},
185191
)
@@ -189,12 +195,13 @@ impl Source for ConvexApi {
189195
async fn document_deltas(
190196
&self,
191197
cursor: DocumentDeltasCursor,
198+
selection: Selection,
192199
) -> anyhow::Result<DocumentDeltasResponse> {
193200
self.post(
194201
"document_deltas",
195202
DocumentDeltasArgs {
196203
cursor: Some(cursor.into()),
197-
selection: SelectionArg::default(),
204+
selection: SelectionArg::Exact { selection },
198205
format: Some("convex_encoded_json".to_string()),
199206
},
200207
)
@@ -242,7 +249,7 @@ pub struct ListSnapshotCursor(pub String);
242249
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
243250
pub struct DocumentDeltasCursor(pub i64);
244251

245-
#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Display)]
252+
#[derive(Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Display, Debug)]
246253
pub struct TableName(pub String);
247254

248255
#[cfg(test)]
@@ -273,7 +280,7 @@ impl ComponentPath {
273280
}
274281
}
275282

276-
#[derive(Display)]
283+
#[derive(Display, Debug)]
277284
pub struct FieldName(pub String);
278285

279286
#[cfg(test)]

crates/fivetran_source/src/sync.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use serde::{
2525
use value_type::Inner as FivetranValue;
2626

2727
use crate::{
28+
api_types::selection::Selection,
2829
convert::to_fivetran_row,
2930
convex_api::{
3031
DocumentDeltasCursor,
@@ -90,6 +91,7 @@ pub enum Checkpoint {
9091
}
9192

9293
/// A simplification of the messages sent to Fivetran in the `update` endpoint.
94+
#[derive(Debug)]
9395
pub enum UpdateMessage {
9496
Update {
9597
schema_name: Option<String>,
@@ -140,9 +142,10 @@ impl From<UpdateMessage> for FivetranUpdateResponse {
140142
pub fn sync(
141143
source: impl Source + 'static,
142144
state: Option<State>,
145+
selection: Selection,
143146
) -> BoxStream<'static, anyhow::Result<UpdateMessage>> {
144147
let Some(state) = state else {
145-
return initial_sync(source, None, Some(BTreeSet::new())).boxed();
148+
return initial_sync(source, None, Some(BTreeSet::new()), selection).boxed();
146149
};
147150

148151
let State {
@@ -152,9 +155,11 @@ pub fn sync(
152155
} = state;
153156
match checkpoint {
154157
Checkpoint::InitialSync { snapshot, cursor } => {
155-
initial_sync(source, Some((snapshot, cursor)), tables_seen).boxed()
158+
initial_sync(source, Some((snapshot, cursor)), tables_seen, selection).boxed()
159+
},
160+
Checkpoint::DeltaUpdates { cursor } => {
161+
delta_sync(source, cursor, tables_seen, selection).boxed()
156162
},
157-
Checkpoint::DeltaUpdates { cursor } => delta_sync(source, cursor, tables_seen).boxed(),
158163
}
159164
}
160165

@@ -164,6 +169,7 @@ async fn initial_sync(
164169
source: impl Source,
165170
mut checkpoint: Option<(i64, ListSnapshotCursor)>,
166171
mut tables_seen: Option<BTreeSet<String>>,
172+
selection: Selection,
167173
) {
168174
let log_msg = if let Some((snapshot, _)) = checkpoint {
169175
format!("Resuming an initial sync from {source} at {snapshot}")
@@ -175,7 +181,9 @@ async fn initial_sync(
175181
let snapshot = loop {
176182
let snapshot = checkpoint.as_ref().map(|c| c.0);
177183
let cursor = checkpoint.as_ref().map(|c| c.1.clone());
178-
let res = source.list_snapshot(snapshot, cursor.clone()).await?;
184+
let res = source
185+
.list_snapshot(snapshot, cursor.clone(), selection.clone())
186+
.await?;
179187

180188
for value in res.values {
181189
if let Some(ref mut tables_seen) = tables_seen {
@@ -235,13 +243,14 @@ async fn delta_sync(
235243
source: impl Source,
236244
cursor: DocumentDeltasCursor,
237245
mut tables_seen: Option<BTreeSet<String>>,
246+
selection: Selection,
238247
) {
239248
log(&format!("Delta sync from {source} starting at {cursor}."));
240249

241250
let mut cursor = cursor;
242251
let mut has_more = true;
243252
while has_more {
244-
let response = source.document_deltas(cursor).await?;
253+
let response = source.document_deltas(cursor, selection.clone()).await?;
245254

246255
for value in response.values {
247256
if let Some(ref mut tables_seen) = tables_seen {

0 commit comments

Comments
 (0)