Skip to content

Commit e8f21e7

Browse files
committed
Added pipeline functionality for transactions
1 parent 474d80d commit e8f21e7

File tree

5 files changed

+184
-1
lines changed

5 files changed

+184
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ chrono = "0.4.33"
2727
chrono-tz = "0.8.5"
2828
uuid = { version = "1.7.0", features = ["v4"] }
2929
serde_json = "1.0.113"
30+
futures-util = "0.3.30"

README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,61 @@ async def main() -> None:
355355
first_row_result = first_row.result() # This will be a dict.
356356
```
357357

358+
### Transaction pipelining
359+
When you have a lot of independent queries and want to execute them concurrently, you can use `pipeline`.
360+
Pipelining can improve performance in use cases in which multiple,
361+
independent queries need to be executed.
362+
In a traditional workflow,
363+
each query is sent to the server after the previous query completes.
364+
In contrast, pipelining allows the client to send all of the queries to the server up front,
365+
minimizing time spent by one side waiting for the other to finish sending data:
366+
```
367+
Sequential Pipelined
368+
| Client | Server | | Client | Server |
369+
|----------------|-----------------| |----------------|-----------------|
370+
| send query 1 | | | send query 1 | |
371+
| | process query 1 | | send query 2 | process query 1 |
372+
| receive rows 1 | | | send query 3 | process query 2 |
373+
| send query 2 | | | receive rows 1 | process query 3 |
374+
| | process query 2 | | receive rows 2 | |
375+
| receive rows 2 | | | receive rows 3 | |
376+
| send query 3 | |
377+
| | process query 3 |
378+
| receive rows 3 | |
379+
```
380+
Read more: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining
381+
382+
Let's see some code:
383+
```python
384+
import asyncio
385+
386+
from psqlpy import PSQLPool, QueryResult
387+
388+
389+
async def main() -> None:
390+
db_pool = PSQLPool()
391+
await db_pool.startup()
392+
393+
transaction = await db_pool.transaction()
394+
395+
results: list[QueryResult] = await transaction.pipeline(
396+
queries=[
397+
(
398+
"SELECT username FROM users WHERE id = $1",
399+
[100],
400+
),
401+
(
402+
"SELECT some_data FROM profiles",
403+
None,
404+
),
405+
(
406+
"INSERT INTO users (username, id) VALUES ($1, $2)",
407+
["PSQLPy", 1],
408+
),
409+
]
410+
)
411+
```
412+
358413
### Transaction ROLLBACK TO SAVEPOINT
359414

360415
You can rollback your transaction to the specified savepoint, but before it you must create it.

python/psqlpy/_internal/__init__.pyi

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,66 @@ class Transaction:
458458
# This way transaction begins and commits by itself.
459459
```
460460
"""
461+
async def pipeline(
462+
self,
463+
queries: list[tuple[str, list[Any] | None]],
464+
) -> list[QueryResult]:
465+
"""Execute queries in pipeline.
466+
467+
Pipelining can improve performance in use cases in which multiple,
468+
independent queries need to be executed.
469+
In a traditional workflow,
470+
each query is sent to the server after the previous query completes.
471+
In contrast, pipelining allows the client to send all of the
472+
queries to the server up front, minimizing time spent
473+
by one side waiting for the other to finish sending data:
474+
```
475+
Sequential Pipelined
476+
| Client | Server | | Client | Server |
477+
|----------------|-----------------| |----------------|-----------------|
478+
| send query 1 | | | send query 1 | |
479+
| | process query 1 | | send query 2 | process query 1 |
480+
| receive rows 1 | | | send query 3 | process query 2 |
481+
| send query 2 | | | receive rows 1 | process query 3 |
482+
| | process query 2 | | receive rows 2 | |
483+
| receive rows 2 | | | receive rows 3 | |
484+
| send query 3 | |
485+
| | process query 3 |
486+
| receive rows 3 | |
487+
```
488+
Read more: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining
489+
### Example:
490+
```python
491+
import asyncio
492+
493+
from psqlpy import PSQLPool, QueryResult
494+
495+
496+
async def main() -> None:
497+
db_pool = PSQLPool()
498+
await db_pool.startup()
499+
500+
transaction = await db_pool.transaction()
501+
502+
results: list[QueryResult] = await transaction.pipeline(
503+
queries=[
504+
(
505+
"SELECT username FROM users WHERE id = $1",
506+
[100],
507+
),
508+
(
509+
"SELECT some_data FROM profiles",
510+
None,
511+
),
512+
(
513+
"INSERT INTO users (username, id) VALUES ($1, $2)",
514+
["PSQLPy", 1],
515+
),
516+
]
517+
)
518+
519+
```
520+
""" # noqa: E501
461521
async def savepoint(self: Self, savepoint_name: str) -> None:
462522
"""Create new savepoint.
463523

src/driver/transaction.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ use crate::{
99
value_converter::{convert_parameters, PythonDTO},
1010
};
1111
use deadpool_postgres::Object;
12+
use futures_util::future;
1213
use pyo3::{
1314
pyclass, pymethods,
14-
types::{PyList, PyString},
15+
types::{PyList, PyString, PyTuple},
1516
Py, PyAny, PyErr, PyObject, PyRef, PyRefMut, Python,
1617
};
1718
use std::{collections::HashSet, sync::Arc, vec};
@@ -163,6 +164,7 @@ impl RustTransaction {
163164

164165
Ok(())
165166
}
167+
166168
/// Fetch single row from query.
167169
///
168170
/// Method doesn't acquire lock on any structure fields.
@@ -213,6 +215,28 @@ impl RustTransaction {
213215

214216
Ok(PSQLDriverSinglePyQueryResult::new(result))
215217
}
218+
219+
/// Run many queries as pipeline.
220+
///
221+
/// It can boost up querying speed.
222+
///
223+
/// # Errors
224+
///
225+
/// May return Err Result if
226+
pub async fn inner_pipeline(
227+
&self,
228+
queries: Vec<(String, Vec<PythonDTO>)>,
229+
) -> RustPSQLDriverPyResult<Vec<PSQLDriverPyQueryResult>> {
230+
let mut futures = vec![];
231+
for (querystring, params) in queries {
232+
let execute_future = self.inner_execute(querystring, params);
233+
futures.push(execute_future);
234+
}
235+
236+
let b = future::try_join_all(futures).await?;
237+
Ok(b)
238+
}
239+
216240
/// Start transaction
217241
/// Set up isolation level if specified
218242
/// Set up deferable if specified
@@ -748,6 +772,48 @@ impl Transaction {
748772
})
749773
}
750774

775+
/// Execute querystrings with parameters and return all results.
776+
///
777+
/// Create pipeline of queries.
778+
///
779+
/// # Errors
780+
///
781+
/// May return Err Result if:
782+
/// 1) Cannot convert python parameters
783+
/// 2) Cannot execute any of querystring.
784+
pub fn pipeline<'a>(
785+
&'a self,
786+
py: Python<'a>,
787+
queries: Option<&'a PyList>,
788+
) -> RustPSQLDriverPyResult<&'a PyAny> {
789+
let mut processed_queries: Vec<(String, Vec<PythonDTO>)> = vec![];
790+
if let Some(queries) = queries {
791+
for single_query in queries {
792+
let query_tuple = single_query.downcast::<PyTuple>().map_err(|err| {
793+
RustPSQLDriverError::PyToRustValueConversionError(format!(
794+
"Cannot cast to tuple: {err}",
795+
))
796+
})?;
797+
let querystring = query_tuple.get_item(0)?.extract::<String>()?;
798+
match query_tuple.get_item(1) {
799+
Ok(params) => {
800+
processed_queries.push((querystring, convert_parameters(params)?));
801+
}
802+
Err(_) => {
803+
processed_queries.push((querystring, vec![]));
804+
}
805+
}
806+
}
807+
}
808+
809+
let transaction_arc = self.transaction.clone();
810+
811+
rustengine_future(py, async move {
812+
let transaction_guard = transaction_arc.read().await;
813+
transaction_guard.inner_pipeline(processed_queries).await
814+
})
815+
}
816+
751817
/// Start the transaction.
752818
///
753819
/// # Errors

0 commit comments

Comments
 (0)