|
10 | 10 | # See the License for the specific language governing permissions and |
11 | 11 | # limitations under the License. |
12 | 12 | import math |
| 13 | +import time as t |
13 | 14 | import uuid |
14 | 15 | from datetime import date, datetime, time, timedelta, timezone |
15 | 16 | from decimal import Decimal |
|
27 | 28 | import trino |
28 | 29 | from tests.integration.conftest import trino_version |
29 | 30 | from trino import constants |
30 | | -from trino.dbapi import Cursor, DescribeOutput |
| 31 | +from trino.dbapi import Cursor, DescribeOutput, TimeBoundLRUCache |
31 | 32 | from trino.exceptions import NotSupportedError, TrinoQueryError, TrinoUserError |
32 | 33 | from trino.transaction import IsolationLevel |
33 | 34 |
|
@@ -1782,6 +1783,41 @@ def test_rowcount_insert(trino_connection): |
1782 | 1783 | assert cur.rowcount == 1 |
1783 | 1784 |
|
1784 | 1785 |
|
| 1786 | +@pytest.mark.parametrize( |
| 1787 | + "legacy_prepared_statements", |
| 1788 | + [ |
| 1789 | + True, |
| 1790 | + pytest.param(False, marks=pytest.mark.skipif( |
| 1791 | + trino_version() <= '417', |
| 1792 | + reason="EXECUTE IMMEDIATE was introduced in version 418")), |
| 1793 | + None |
| 1794 | + ] |
| 1795 | +) |
| 1796 | +def test_prepared_statement_capability_autodetection(legacy_prepared_statements, run_trino): |
| 1797 | + # start with an empty cache |
| 1798 | + trino.dbapi.must_use_legacy_prepared_statements = TimeBoundLRUCache(1024, 3600) |
| 1799 | + user_name = f"user_{t.monotonic_ns()}" |
| 1800 | + |
| 1801 | + _, host, port = run_trino |
| 1802 | + connection = trino.dbapi.Connection( |
| 1803 | + host=host, |
| 1804 | + port=port, |
| 1805 | + user=user_name, |
| 1806 | + legacy_prepared_statements=legacy_prepared_statements, |
| 1807 | + ) |
| 1808 | + cur = connection.cursor() |
| 1809 | + cur.execute("SELECT ?", [42]) |
| 1810 | + cur.fetchall() |
| 1811 | + another = connection.cursor() |
| 1812 | + another.execute("SELECT ?", [100]) |
| 1813 | + another.fetchall() |
| 1814 | + |
| 1815 | + verify = connection.cursor() |
| 1816 | + rows = verify.execute("SELECT query FROM system.runtime.queries WHERE user = ?", [user_name]) |
| 1817 | + statements = [stmt for row in rows for stmt in row] |
| 1818 | + assert statements.count("EXECUTE IMMEDIATE 'SELECT 1'") == (1 if legacy_prepared_statements is None else 0) |
| 1819 | + |
| 1820 | + |
1785 | 1821 | def get_cursor(legacy_prepared_statements, run_trino): |
1786 | 1822 | _, host, port = run_trino |
1787 | 1823 |
|
|
0 commit comments