Skip to content

Commit a08e816

Browse files
committed
LISTEN/NOTIFY funcionality
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent df348b1 commit a08e816

File tree

8 files changed

+250
-8
lines changed

8 files changed

+250
-8
lines changed

docs/.vuepress/sidebar.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export default sidebar({
2323
"connection",
2424
"transaction",
2525
"cursor",
26+
"listener",
2627
"results",
2728
"exceptions",
2829
],

docs/components/connection_pool.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,18 @@ This is the preferable way to work with the PostgreSQL.
254254
:::
255255

256256

257+
### Listener
258+
259+
Create a new instance of a listener.
260+
261+
```python
262+
async def main() -> None:
263+
...
264+
listener = db_pool.listener()
265+
```
266+
```
267+
268+
257269
### Close
258270
259271
To close the connection pool at the stop of your application.

docs/components/listener.md

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
---
2+
title: Listener
3+
---
4+
5+
`Listener` object allows users to work with [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)/[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) functionality.
6+
7+
## Usage
8+
9+
There two ways of using `Listener` object:
10+
- Async iterator
11+
- Background task
12+
13+
::: tabs
14+
15+
@tab Background task
16+
```python
17+
from psqlpy import ConnectionPool, Connection, Listener
18+
19+
20+
db_pool = ConnectionPool(
21+
dsn="postgres://postgres:postgres@localhost:5432/postgres",
22+
)
23+
24+
async def test_channel_callback(
25+
connection: Connection,
26+
payload: str,
27+
channel: str,
28+
process_id: int,
29+
) -> None:
30+
# do some important staff
31+
...
32+
33+
async def main() -> None:
34+
# Create listener object
35+
listener: Listener = db_pool.listener()
36+
37+
# Add channel to listen and callback for it.
38+
await listener.add_callback(
39+
channel="test_channel",
40+
callback=test_channel_callback,
41+
)
42+
43+
# Startup the listener
44+
await listener.startup()
45+
46+
# Start listening.
47+
# `listen` method isn't blocking, it returns None and starts background
48+
# task in the Rust event loop.
49+
listener.listen()
50+
51+
# You can stop listening.
52+
listener.abort_listen()
53+
```
54+
55+
@tab Async Iterator
56+
```python
57+
from psqlpy import (
58+
ConnectionPool,
59+
Connection,
60+
Listener,
61+
ListenerNotificationMsg,
62+
)
63+
64+
65+
db_pool = ConnectionPool(
66+
dsn="postgres://postgres:postgres@localhost:5432/postgres",
67+
)
68+
69+
async def main() -> None:
70+
# Create listener object
71+
listener: Listener = db_pool.listener()
72+
73+
# Startup the listener
74+
await listener.startup()
75+
76+
listener_msg: ListenerNotificationMsg
77+
async for listener_msg in listener:
78+
print(listener_msg)
79+
```
80+
81+
:::
82+
83+
## Listener attributes
84+
85+
- `connection`: Instance of `Connection`.
86+
If `startup` wasn't called, raises `ListenerStartError`.
87+
88+
- `is_started`: Flag that shows whether the `Listener` is running or not.
89+
90+
## Listener methods
91+
92+
### Startup
93+
94+
Startup `Listener` instance and can be called once or again only after `shutdown`.
95+
96+
::: important
97+
`Listener` must be started up.
98+
:::
99+
100+
```python
101+
async def main() -> None:
102+
listener: Listener = db_pool.listener()
103+
104+
await listener.startup()
105+
```
106+
107+
### Shutdown
108+
Abort listen (if called) and release underlying connection.
109+
110+
```python
111+
async def main() -> None:
112+
listener: Listener = db_pool.listener()
113+
114+
await listener.startup()
115+
await listener.shutdown()
116+
```
117+
118+
### Add Callback
119+
120+
#### Parameters:
121+
- `channel`: name of the channel to listen.
122+
- `callback`: coroutine callback.
123+
124+
Add new callback to the channel, can be called more than 1 times.
125+
126+
Callback signature is like this:
127+
```python
128+
from psqlpy import Connection
129+
130+
async def callback(
131+
connection: Connection,
132+
payload: str,
133+
channel: str,
134+
process_id: int,
135+
) -> None:
136+
...
137+
```
138+
139+
Parameters for callback are based like `args`, so this signature is correct to:
140+
```python
141+
async def callback(
142+
connection: Connection,
143+
*args,
144+
) -> None:
145+
...
146+
```
147+
148+
**Example:**
149+
```python
150+
async def test_channel_callback(
151+
connection: Connection,
152+
payload: str,
153+
channel: str,
154+
process_id: int,
155+
) -> None:
156+
...
157+
158+
async def main() -> None:
159+
listener = db_pool.listener()
160+
161+
await listener.add_callback(
162+
channel="test_channel",
163+
callback=test_channel_callback,
164+
)
165+
```
166+
167+
### Clear Channel Callbacks
168+
169+
#### Parameters:
170+
- `channel`: name of the channel
171+
172+
Remove all callbacks for the channel
173+
174+
```python
175+
async def main() -> None:
176+
listener = db_pool.listener()
177+
await listener.clear_channel_callbacks()
178+
```
179+
180+
### Clear All Channels
181+
Clear all channels and callbacks.
182+
183+
```python
184+
async def main() -> None:
185+
listener = db_pool.listener()
186+
await listener.clear_all_channels()
187+
```
188+
189+
### Listen
190+
Start listening.
191+
192+
It's a non-blocking operation.
193+
In the background it creates task in Rust event loop.
194+
195+
```python
196+
async def main() -> None:
197+
listener = db_pool.listener()
198+
await listener.startup()
199+
await listener.listen()
200+
```
201+
202+
### Abort Listen
203+
Abort listen.
204+
If `listen()` method was called, stop listening, else don't do anything.

python/psqlpy/_internal/__init__.pyi

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,7 @@ class Listener:
18091809
"""
18101810

18111811
connection: Connection
1812+
is_started: bool
18121813

18131814
def __aiter__(self: Self) -> Self: ...
18141815
async def __anext__(self: Self) -> ListenerNotificationMsg: ...
@@ -1824,6 +1825,11 @@ class Listener:
18241825
18251826
Each listener MUST be started up.
18261827
"""
1828+
async def shutdown(self: Self) -> None:
1829+
"""Shutdown the listener.
1830+
1831+
Abort listen and release underlying connection.
1832+
"""
18271833
async def add_callback(
18281834
self: Self,
18291835
channel: str,
@@ -1844,7 +1850,7 @@ class Listener:
18441850
) -> None: ...
18451851
```
18461852
1847-
Callback parameters are passed as args.
1853+
Callback parameters are passed as args on the Rust side.
18481854
"""
18491855

18501856
async def clear_channel_callbacks(self, channel: str) -> None:

src/driver/connection_pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub fn connect(
127127
let mgr: Manager = build_manager(
128128
mgr_config,
129129
pg_config.clone(),
130-
build_tls(&ca_file, ssl_mode)?,
130+
build_tls(&ca_file, &ssl_mode)?,
131131
);
132132

133133
let mut db_pool_builder = Pool::builder(mgr);

src/driver/connection_pool_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl ConnectionPoolBuilder {
5353
let mgr: Manager = build_manager(
5454
mgr_config,
5555
self.config.clone(),
56-
build_tls(&self.ca_file, self.ssl_mode)?,
56+
build_tls(&self.ca_file, &self.ssl_mode)?,
5757
);
5858

5959
let mut db_pool_builder = Pool::builder(mgr);

src/driver/listener/core.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,19 @@ impl Listener {
161161
}
162162

163163
#[getter]
164-
fn connection(&self) -> Connection {
165-
self.connection.clone()
164+
fn is_started(&self) -> bool {
165+
self.is_started
166+
}
167+
168+
#[getter]
169+
fn connection(&self) -> RustPSQLDriverPyResult<Connection> {
170+
if !self.is_started {
171+
return Err(RustPSQLDriverError::ListenerStartError(
172+
"Listener isn't started up".into(),
173+
));
174+
}
175+
176+
Ok(self.connection.clone())
166177
}
167178

168179
async fn startup(&mut self) -> RustPSQLDriverPyResult<()> {
@@ -172,7 +183,7 @@ impl Listener {
172183
));
173184
}
174185

175-
let tls_ = build_tls(&self.ca_file.clone(), self.ssl_mode)?;
186+
let tls_ = build_tls(&self.ca_file, &self.ssl_mode)?;
176187

177188
let mut builder = SslConnector::builder(SslMethod::tls())?;
178189
builder.set_verify(SslVerifyMode::NONE);
@@ -214,6 +225,14 @@ impl Listener {
214225
Ok(())
215226
}
216227

228+
async fn shutdown(&mut self) {
229+
self.abort_listen();
230+
std::mem::take(&mut self.connection);
231+
std::mem::take(&mut self.receiver);
232+
233+
self.is_started = false;
234+
}
235+
217236
#[pyo3(signature = (channel, callback))]
218237
async fn add_callback(
219238
&mut self,

src/driver/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ pub enum ConfiguredTLS {
181181
/// May return Err Result if cannot create builder.
182182
pub fn build_tls(
183183
ca_file: &Option<String>,
184-
ssl_mode: Option<SslMode>,
184+
ssl_mode: &Option<SslMode>,
185185
) -> RustPSQLDriverPyResult<ConfiguredTLS> {
186186
if let Some(ca_file) = ca_file {
187187
let mut builder = SslConnector::builder(SslMethod::tls())?;
@@ -190,7 +190,7 @@ pub fn build_tls(
190190
builder.build(),
191191
)));
192192
} else if let Some(ssl_mode) = ssl_mode {
193-
if ssl_mode == common_options::SslMode::Require {
193+
if *ssl_mode == common_options::SslMode::Require {
194194
let mut builder = SslConnector::builder(SslMethod::tls())?;
195195
builder.set_verify(SslVerifyMode::NONE);
196196
return Ok(ConfiguredTLS::TlsConnector(MakeTlsConnector::new(

0 commit comments

Comments
 (0)