|
1 | 1 | import pytest |
2 | 2 |
|
3 | | -from psqlpy import Connection, ConnectionPool, ConnRecyclingMethod, QueryResult, connect |
4 | | -from psqlpy.exceptions import RustPSQLDriverPyBaseError |
| 3 | +from psqlpy import ( |
| 4 | + Connection, |
| 5 | + ConnectionPool, |
| 6 | + ConnLoadBalanceHosts, |
| 7 | + ConnRecyclingMethod, |
| 8 | + ConnTargetSessionAttrs, |
| 9 | + QueryResult, |
| 10 | + connect, |
| 11 | +) |
| 12 | +from psqlpy.exceptions import DBPoolConfigurationError, RustPSQLDriverPyBaseError |
5 | 13 |
|
6 | 14 | pytestmark = pytest.mark.anyio |
7 | 15 |
|
@@ -68,6 +76,73 @@ async def test_pool_conn_recycling_method( |
68 | 76 | await pg_pool.execute("SELECT 1") |
69 | 77 |
|
70 | 78 |
|
| 79 | +async def test_build_pool_failure() -> None: |
| 80 | + with pytest.raises(expected_exception=DBPoolConfigurationError): |
| 81 | + ConnectionPool( |
| 82 | + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", |
| 83 | + connect_timeout_nanosec=12, |
| 84 | + ) |
| 85 | + with pytest.raises(expected_exception=DBPoolConfigurationError): |
| 86 | + ConnectionPool( |
| 87 | + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", |
| 88 | + connect_timeout_nanosec=12, |
| 89 | + ) |
| 90 | + with pytest.raises(expected_exception=DBPoolConfigurationError): |
| 91 | + ConnectionPool( |
| 92 | + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", |
| 93 | + keepalives_idle_nanosec=12, |
| 94 | + ) |
| 95 | + with pytest.raises(expected_exception=DBPoolConfigurationError): |
| 96 | + ConnectionPool( |
| 97 | + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", |
| 98 | + keepalives_interval_nanosec=12, |
| 99 | + ) |
| 100 | + |
| 101 | + |
| 102 | +@pytest.mark.parametrize( |
| 103 | + "target_session_attrs", |
| 104 | + [ |
| 105 | + ConnTargetSessionAttrs.Any, |
| 106 | + ConnTargetSessionAttrs.ReadWrite, |
| 107 | + ConnTargetSessionAttrs.ReadOnly, |
| 108 | + ], |
| 109 | +) |
| 110 | +async def test_pool_target_session_attrs( |
| 111 | + target_session_attrs: ConnTargetSessionAttrs, |
| 112 | +) -> None: |
| 113 | + pg_pool = ConnectionPool( |
| 114 | + db_name="psqlpy_test", |
| 115 | + host="localhost", |
| 116 | + username="postgres", |
| 117 | + password="postgres", # noqa: S106 |
| 118 | + target_session_attrs=target_session_attrs, |
| 119 | + ) |
| 120 | + |
| 121 | + if target_session_attrs == ConnTargetSessionAttrs.ReadOnly: |
| 122 | + with pytest.raises(expected_exception=RustPSQLDriverPyBaseError): |
| 123 | + await pg_pool.execute("SELECT 1") |
| 124 | + else: |
| 125 | + await pg_pool.execute("SELECT 1") |
| 126 | + |
| 127 | + |
| 128 | +@pytest.mark.parametrize( |
| 129 | + "load_balance_hosts", |
| 130 | + [ |
| 131 | + ConnLoadBalanceHosts.Disable, |
| 132 | + ConnLoadBalanceHosts.Random, |
| 133 | + ], |
| 134 | +) |
| 135 | +async def test_pool_load_balance_hosts( |
| 136 | + load_balance_hosts: ConnLoadBalanceHosts, |
| 137 | +) -> None: |
| 138 | + pg_pool = ConnectionPool( |
| 139 | + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", |
| 140 | + load_balance_hosts=load_balance_hosts, |
| 141 | + ) |
| 142 | + |
| 143 | + await pg_pool.execute("SELECT 1") |
| 144 | + |
| 145 | + |
71 | 146 | async def test_close_connection_pool() -> None: |
72 | 147 | """Test that `close` method closes connection pool.""" |
73 | 148 | pg_pool = ConnectionPool( |
|
0 commit comments