Skip to content

Commit 5f5da9f

Browse files
committed
Support: Add re-usable patches and polyfills from application adapters
Sources: MLflow, LangChain, Singer/Meltano, rdflib-sqlalchemy
1 parent a5e05d0 commit 5f5da9f

File tree

6 files changed

+311
-0
lines changed

6 files changed

+311
-0
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
- Added `CrateIdentifierPreparer`, in order to quote reserved words
77
like `object` properly, for example when used as column names.
88
- Fixed `CrateDialect.get_pk_constraint` to return `list` instead of `set` type
9+
- Added re-usable patches and polyfills from application adapters.
10+
New utilities: `patch_autoincrement_timestamp`, `refresh_after_dml`,
11+
`check_uniqueness_factory`
912

1013
## 2024/06/13 0.37.0
1114
- Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
from sqlalchemy_cratedb.support.pandas import insert_bulk
2+
from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \
3+
patch_autoincrement_timestamp
4+
from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty
25

36
__all__ = [
7+
check_uniqueness_factory,
48
insert_bulk,
9+
patch_autoincrement_timestamp,
10+
refresh_after_dml,
11+
refresh_dirty,
12+
refresh_table,
513
]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import sqlalchemy as sa
2+
from sqlalchemy.event import listen
3+
import typing as t
4+
5+
from sqlalchemy_cratedb.support.util import refresh_dirty, refresh_table
6+
7+
8+
def patch_autoincrement_timestamp():
9+
"""
10+
Configure SQLAlchemy model columns with an alternative to `autoincrement=True`.
11+
Use the current timestamp instead.
12+
13+
This is used by CrateDB's MLflow adapter.
14+
15+
TODO: Maybe enable through a dialect parameter `crate_polyfill_autoincrement` or such.
16+
"""
17+
import sqlalchemy.sql.schema as schema
18+
19+
init_dist = schema.Column.__init__
20+
21+
def __init__(self, *args, **kwargs):
22+
if "autoincrement" in kwargs:
23+
del kwargs["autoincrement"]
24+
if "default" not in kwargs:
25+
kwargs["default"] = sa.func.now()
26+
init_dist(self, *args, **kwargs)
27+
28+
schema.Column.__init__ = __init__ # type: ignore[method-assign]
29+
30+
31+
def check_uniqueness_factory(sa_entity, *attribute_names):
32+
"""
33+
Run a manual column value uniqueness check on a table, and raise an IntegrityError if applicable.
34+
35+
CrateDB does not support the UNIQUE constraint on columns. This attempts to emulate it.
36+
37+
https://github.com/crate/sqlalchemy-cratedb/issues/76
38+
39+
This is used by CrateDB's MLflow adapter.
40+
41+
TODO: Maybe enable through a dialect parameter `crate_polyfill_unique` or such.
42+
"""
43+
44+
# Synthesize a canonical "name" for the constraint,
45+
# composed of all column names involved.
46+
constraint_name: str = "-".join(attribute_names)
47+
48+
def check_uniqueness(mapper, connection, target):
49+
from sqlalchemy.exc import IntegrityError
50+
51+
if isinstance(target, sa_entity):
52+
# TODO: How to use `session.query(SqlExperiment)` here?
53+
stmt = mapper.selectable.select()
54+
for attribute_name in attribute_names:
55+
stmt = stmt.filter(getattr(sa_entity, attribute_name) == getattr(target, attribute_name))
56+
stmt = stmt.compile(bind=connection.engine)
57+
results = connection.execute(stmt)
58+
if results.rowcount > 0:
59+
raise IntegrityError(
60+
statement=stmt,
61+
params=[],
62+
orig=Exception(
63+
f"DuplicateKeyException in table '{target.__tablename__}' " f"on constraint '{constraint_name}'"
64+
),
65+
)
66+
67+
return check_uniqueness
68+
69+
70+
def refresh_after_dml_session(session: sa.orm.Session):
71+
"""
72+
Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE).
73+
74+
CrateDB is eventually consistent, i.e. write operations are not flushed to
75+
disk immediately, so readers may see stale data. In a traditional OLTP-like
76+
application, this is not applicable.
77+
78+
This SQLAlchemy extension makes sure that data is synchronized after each
79+
operation manipulating data.
80+
81+
> `after_{insert,update,delete}` events only apply to the session flush operation
82+
> and do not apply to the ORM DML operations described at ORM-Enabled INSERT,
83+
> UPDATE, and DELETE statements. To intercept ORM DML events, use
84+
> `SessionEvents.do_orm_execute().`
85+
> -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_insert
86+
87+
> Intercept statement executions that occur on behalf of an ORM Session object.
88+
> -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.do_orm_execute
89+
90+
> Execute after flush has completed, but before commit has been called.
91+
> -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_flush
92+
93+
This is used by CrateDB's LangChain adapter.
94+
95+
TODO: Maybe enable through a dialect parameter `crate_dml_refresh` or such.
96+
""" # noqa: E501
97+
listen(session, "after_flush", refresh_dirty)
98+
99+
100+
def refresh_after_dml_engine(engine: sa.engine.Engine):
101+
"""
102+
Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE).
103+
104+
This is used by CrateDB's Singer/Meltano and `rdflib-sqlalchemy` adapters.
105+
"""
106+
def receive_after_execute(
107+
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
108+
):
109+
if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)):
110+
if not isinstance(clauseelement.table, sa.sql.Join):
111+
full_table_name = f'"{clauseelement.table.name}"'
112+
if clauseelement.table.schema is not None:
113+
full_table_name = f'"{clauseelement.table.schema}".' + full_table_name
114+
refresh_table(conn, full_table_name)
115+
116+
sa.event.listen(engine, "after_execute", receive_after_execute)
117+
118+
119+
def refresh_after_dml(engine_or_session: t.Union[sa.engine.Engine, sa.orm.Session]):
120+
"""
121+
Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE).
122+
"""
123+
if isinstance(engine_or_session, sa.engine.Engine):
124+
refresh_after_dml_engine(engine_or_session)
125+
elif isinstance(engine_or_session, (sa.orm.Session, sa.orm.scoping.scoped_session)):
126+
refresh_after_dml_session(engine_or_session)
127+
else:
128+
raise TypeError(f"Unknown type: {type(engine_or_session)}")
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import itertools
2+
import typing as t
3+
4+
import sqlalchemy as sa
5+
6+
if t.TYPE_CHECKING:
7+
try:
8+
from sqlalchemy.orm import DeclarativeBase
9+
except ImportError:
10+
pass
11+
12+
13+
def refresh_table(connection, target: t.Union[str, "DeclarativeBase"]):
14+
"""
15+
Invoke a `REFRESH TABLE` statement.
16+
"""
17+
if hasattr(target, "__tablename__"):
18+
sql = f"REFRESH TABLE {target.__tablename__}"
19+
else:
20+
sql = f"REFRESH TABLE {target}"
21+
connection.execute(sa.text(sql))
22+
23+
24+
def refresh_dirty(session, flush_context=None):
25+
"""
26+
Invoke a `REFRESH TABLE` statement on each table entity flagged as "dirty".
27+
28+
SQLAlchemy event handler for the 'after_flush' event,
29+
invoking `REFRESH TABLE` on each table which has been modified.
30+
"""
31+
dirty_entities = itertools.chain(session.new, session.dirty, session.deleted)
32+
dirty_classes = {entity.__class__ for entity in dirty_entities}
33+
for class_ in dirty_classes:
34+
refresh_table(session, class_)

tests/integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def drop_tables():
124124
"DROP TABLE IF EXISTS archived_tasks",
125125
"DROP TABLE IF EXISTS characters",
126126
"DROP TABLE IF EXISTS cities",
127+
"DROP TABLE IF EXISTS foobar",
127128
"DROP TABLE IF EXISTS locations",
128129
"DROP BLOB TABLE IF EXISTS myfiles",
129130
"DROP TABLE IF EXISTS search",

tests/test_support_polyfill.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import datetime as dt
2+
3+
import pytest
4+
import sqlalchemy as sa
5+
from sqlalchemy.event import listen
6+
from sqlalchemy.exc import IntegrityError
7+
from sqlalchemy.orm import sessionmaker
8+
9+
from sqlalchemy_cratedb import SA_VERSION, SA_1_4
10+
11+
try:
12+
from sqlalchemy.orm import declarative_base
13+
except ImportError:
14+
from sqlalchemy.ext.declarative import declarative_base
15+
16+
from sqlalchemy_cratedb.support import check_uniqueness_factory, patch_autoincrement_timestamp, refresh_after_dml
17+
18+
19+
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Test case not supported on SQLAlchemy 1.3 and earlier")
20+
def test_autoincrement_timestamp(cratedb_service):
21+
"""
22+
Validate autoincrement columns using `sa.DateTime` columns.
23+
24+
https://github.com/crate/sqlalchemy-cratedb/issues/77
25+
"""
26+
patch_autoincrement_timestamp()
27+
28+
engine = cratedb_service.database.engine
29+
session = sessionmaker(bind=engine)()
30+
Base = declarative_base()
31+
32+
# Define DDL.
33+
class FooBar(Base):
34+
__tablename__ = 'foobar'
35+
id = sa.Column(sa.String, primary_key=True)
36+
date = sa.Column(sa.DateTime, autoincrement=True)
37+
number = sa.Column(sa.BigInteger, autoincrement=True)
38+
string = sa.Column(sa.String, autoincrement=True)
39+
40+
Base.metadata.drop_all(engine, checkfirst=True)
41+
Base.metadata.create_all(engine, checkfirst=True)
42+
43+
# Insert record.
44+
foo_item = FooBar(id="foo")
45+
session.add(foo_item)
46+
session.commit()
47+
session.execute(sa.text("REFRESH TABLE foobar"))
48+
49+
# Query record.
50+
result = session.execute(sa.select(FooBar.date, FooBar.number, FooBar.string)).mappings().first()
51+
52+
# Compare outcome.
53+
assert result["date"].year == dt.datetime.now().year
54+
assert result["number"] >= 1718846016235
55+
assert result["string"] >= "1718846016235"
56+
57+
58+
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
59+
def test_check_uniqueness_factory(cratedb_service):
60+
"""
61+
Validate basic synthetic UNIQUE constraints.
62+
63+
https://github.com/crate/sqlalchemy-cratedb/issues/76
64+
"""
65+
66+
engine = cratedb_service.database.engine
67+
session = sessionmaker(bind=engine)()
68+
Base = declarative_base()
69+
70+
# Define DDL.
71+
class FooBar(Base):
72+
__tablename__ = 'foobar'
73+
id = sa.Column(sa.String, primary_key=True)
74+
name = sa.Column(sa.String)
75+
76+
# Add synthetic UNIQUE constraint on `name` column.
77+
listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name"))
78+
79+
Base.metadata.drop_all(engine, checkfirst=True)
80+
Base.metadata.create_all(engine, checkfirst=True)
81+
82+
# Insert baseline record.
83+
foo_item = FooBar(id="foo", name="foo")
84+
session.add(foo_item)
85+
session.commit()
86+
session.execute(sa.text("REFRESH TABLE foobar"))
87+
88+
# Insert second record, violating the uniqueness constraint.
89+
bar_item = FooBar(id="bar", name="foo")
90+
session.add(bar_item)
91+
with pytest.raises(IntegrityError) as ex:
92+
session.commit()
93+
assert ex.match("DuplicateKeyException in table 'foobar' on constraint 'name'")
94+
95+
96+
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
97+
@pytest.mark.parametrize("mode", ["engine", "session"])
98+
def test_refresh_after_dml(cratedb_service, mode):
99+
"""
100+
Validate automatic `REFRESH TABLE` issuing works well.
101+
102+
https://github.com/crate/sqlalchemy-cratedb/issues/83
103+
"""
104+
engine = cratedb_service.database.engine
105+
session = sessionmaker(bind=engine)()
106+
Base = declarative_base()
107+
108+
# Enable automatic refresh.
109+
if mode == "engine":
110+
refresh_after_dml(engine)
111+
elif mode == "session":
112+
refresh_after_dml(session)
113+
else:
114+
raise ValueError(f"Unable to enable automatic refresh with mode: {mode}")
115+
116+
# Define DDL.
117+
class FooBar(Base):
118+
__tablename__ = 'foobar'
119+
id = sa.Column(sa.String, primary_key=True)
120+
121+
Base.metadata.drop_all(engine, checkfirst=True)
122+
Base.metadata.create_all(engine, checkfirst=True)
123+
124+
# Insert baseline record.
125+
foo_item = FooBar(id="foo")
126+
session.add(foo_item)
127+
session.commit()
128+
129+
# Query record.
130+
query = session.query(FooBar.id)
131+
result = query.first()
132+
133+
# Sanity checks.
134+
assert result is not None, "Database result is empty. Most probably, `REFRESH TABLE` wasn't issued."
135+
136+
# Compare outcome.
137+
assert result[0] == "foo"

0 commit comments

Comments
 (0)