Skip to content

Commit cdd1227

Browse files
author
Santos, Tyler (Boston)
committed
fix(ingest): multi-database ingestion config supported by odbc connections
Per the docs if the database field is omitted from the config then all databases are considered for ingestion. This was not the case as the metadata ingested into a datahub instance contained only the initial connection database contents
1 parent 66f36c2 commit cdd1227

File tree

2 files changed

+59
-14
lines changed

2 files changed

+59
-14
lines changed

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ def get_sql_alchemy_url(
160160
uri_opts=uri_opts,
161161
)
162162
if self.use_odbc:
163-
uri = f"{uri}?{urllib.parse.urlencode(self.uri_args)}"
163+
uri_args = ({k:v for k,v in self.uri_args.items() if k.lower() != 'database'}
164+
if current_db else
165+
self.uri_args)
166+
uri = f"{uri}?{urllib.parse.urlencode(uri_args)}"
164167
return uri
165168

166169
@property
@@ -939,6 +942,24 @@ def construct_flow_workunits(
939942
aspect=data_flow.as_container_aspect,
940943
).as_workunit()
941944

945+
def _database_names_from_engine(self, engine):
946+
"""
947+
Helper method to get database names from the engine.
948+
This is used to fetch the list of databases in the SQL Server instance.
949+
"""
950+
with engine.begin() as conn:
951+
databases = conn.execute(
952+
"SELECT name FROM master.sys.databases WHERE name NOT IN \
953+
('master', 'model', 'msdb', 'tempdb', 'Resource', \
954+
'distribution' , 'reportserver', 'reportservertempdb'); "
955+
).fetchall()
956+
return [db["name"] for db in databases]
957+
958+
def _inspector_for_database(self, db_name: str) -> Inspector:
959+
url = self.config.get_sql_alchemy_url(current_db=db_name)
960+
engine = create_engine(url, **self.config.options)
961+
return inspect(engine)
962+
942963
def get_inspectors(self) -> Iterable[Inspector]:
943964
# This method can be overridden in the case that you want to dynamically
944965
# run on multiple databases.
@@ -950,19 +971,10 @@ def get_inspectors(self) -> Iterable[Inspector]:
950971
inspector = inspect(engine)
951972
yield inspector
952973
else:
953-
with engine.begin() as conn:
954-
databases = conn.execute(
955-
"SELECT name FROM master.sys.databases WHERE name NOT IN \
956-
('master', 'model', 'msdb', 'tempdb', 'Resource', \
957-
'distribution' , 'reportserver', 'reportservertempdb'); "
958-
).fetchall()
959-
960-
for db in databases:
961-
if self.config.database_pattern.allowed(db["name"]):
962-
url = self.config.get_sql_alchemy_url(current_db=db["name"])
963-
engine = create_engine(url, **self.config.options)
964-
inspector = inspect(engine)
965-
self.current_database = db["name"]
974+
for db_name in self._database_names_from_engine(engine):
975+
if self.config.database_pattern.allowed(db_name):
976+
inspector = self._inspector_for_database(db_name)
977+
self.current_database = db_name
966978
yield inspector
967979

968980
def get_identifier(

metadata-ingestion/tests/unit/test_mssql.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,3 +299,36 @@ def test_stored_procedure_vs_direct_query_compatibility(mssql_source):
299299
# Verify database_name is properly set
300300
assert sp_step["database_name"] == "test_db"
301301
assert direct_step["database_name"] == "test_db"
302+
303+
304+
def test_get_sql_alchemy_url_ignores_config_db_when_override_is_provided():
305+
"""Test that the database name is ignored when an override is provided"""
306+
override_db = "override_db"
307+
config_defined_db = "config_defined_connection_db"
308+
309+
config = SQLServerConfig(
310+
host_port="localhost:1433",
311+
username="test",
312+
password="test",
313+
use_odbc=True,
314+
uri_args={'database': config_defined_db, 'driver': 'some_driver_value'},
315+
)
316+
317+
# Mock to avoid DB connections
318+
with patch("datahub.ingestion.source.sql.sql_common.SQLAlchemySource.__init__"), \
319+
patch("datahub.ingestion.source.sql.mssql.source.SQLServerSource._database_names_from_engine") as mock_db_names, \
320+
patch("datahub.ingestion.source.sql.mssql.source.SQLServerSource._inspector_for_database"):
321+
mock_db_names.return_value = [override_db]
322+
source = SQLServerSource(config, MagicMock())
323+
324+
# Call the method with the override
325+
default_connection = source.config.get_sql_alchemy_url()
326+
327+
# assert that when not overridden, the default connection uses the config defined database
328+
assert config_defined_db in default_connection
329+
assert override_db not in default_connection
330+
331+
# assert that when overridden, the override database is used
332+
override_db_connection = source.config.get_sql_alchemy_url(current_db=override_db)
333+
assert override_db in override_db_connection
334+
assert config_defined_db not in override_db_connection

0 commit comments

Comments
 (0)