Skip to content

Commit b599fb1

Browse files
authored
Merge pull request #12 from qaspen-python/feature/add_pipeline_queries
Added pipeline functionality for transactions
2 parents 474d80d + 02f6efe commit b599fb1

File tree

5 files changed

+185
-1
lines changed

5 files changed

+185
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ chrono = "0.4.33"
2727
chrono-tz = "0.8.5"
2828
uuid = { version = "1.7.0", features = ["v4"] }
2929
serde_json = "1.0.113"
30+
futures-util = "0.3.30"

README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,61 @@ async def main() -> None:
355355
first_row_result = first_row.result() # This will be a dict.
356356
```
357357

358+
### Transaction pipelining
359+
When you have a lot of independent queries and want to execute them concurrently, you can use `pipeline`.
360+
Pipelining can improve performance in use cases in which multiple,
361+
independent queries need to be executed.
362+
In a traditional workflow,
363+
each query is sent to the server after the previous query completes.
364+
In contrast, pipelining allows the client to send all of the queries to the server up front,
365+
minimizing time spent by one side waiting for the other to finish sending data:
366+
```
367+
Sequential Pipelined
368+
| Client | Server | | Client | Server |
369+
|----------------|-----------------| |----------------|-----------------|
370+
| send query 1 | | | send query 1 | |
371+
| | process query 1 | | send query 2 | process query 1 |
372+
| receive rows 1 | | | send query 3 | process query 2 |
373+
| send query 2 | | | receive rows 1 | process query 3 |
374+
| | process query 2 | | receive rows 2 | |
375+
| receive rows 2 | | | receive rows 3 | |
376+
| send query 3 | |
377+
| | process query 3 |
378+
| receive rows 3 | |
379+
```
380+
Read more: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining
381+
382+
Let's see some code:
383+
```python
384+
import asyncio
385+
386+
from psqlpy import PSQLPool, QueryResult
387+
388+
389+
async def main() -> None:
390+
db_pool = PSQLPool()
391+
await db_pool.startup()
392+
393+
transaction = await db_pool.transaction()
394+
395+
results: list[QueryResult] = await transaction.pipeline(
396+
queries=[
397+
(
398+
"SELECT username FROM users WHERE id = $1",
399+
[100],
400+
),
401+
(
402+
"SELECT some_data FROM profiles",
403+
None,
404+
),
405+
(
406+
"INSERT INTO users (username, id) VALUES ($1, $2)",
407+
["PSQLPy", 1],
408+
),
409+
]
410+
)
411+
```
412+
358413
### Transaction ROLLBACK TO SAVEPOINT
359414

360415
You can rollback your transaction to the specified savepoint, but before it you must create it.

python/psqlpy/_internal/__init__.pyi

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,66 @@ class Transaction:
458458
# This way transaction begins and commits by itself.
459459
```
460460
"""
461+
async def pipeline(
462+
self,
463+
queries: list[tuple[str, list[Any] | None]],
464+
) -> list[QueryResult]:
465+
"""Execute queries in pipeline.
466+
467+
Pipelining can improve performance in use cases in which multiple,
468+
independent queries need to be executed.
469+
In a traditional workflow,
470+
each query is sent to the server after the previous query completes.
471+
In contrast, pipelining allows the client to send all of the
472+
queries to the server up front, minimizing time spent
473+
by one side waiting for the other to finish sending data:
474+
```
475+
Sequential Pipelined
476+
| Client | Server | | Client | Server |
477+
|----------------|-----------------| |----------------|-----------------|
478+
| send query 1 | | | send query 1 | |
479+
| | process query 1 | | send query 2 | process query 1 |
480+
| receive rows 1 | | | send query 3 | process query 2 |
481+
| send query 2 | | | receive rows 1 | process query 3 |
482+
| | process query 2 | | receive rows 2 | |
483+
| receive rows 2 | | | receive rows 3 | |
484+
| send query 3 | |
485+
| | process query 3 |
486+
| receive rows 3 | |
487+
```
488+
Read more: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining
489+
### Example:
490+
```python
491+
import asyncio
492+
493+
from psqlpy import PSQLPool, QueryResult
494+
495+
496+
async def main() -> None:
497+
db_pool = PSQLPool()
498+
await db_pool.startup()
499+
500+
transaction = await db_pool.transaction()
501+
502+
results: list[QueryResult] = await transaction.pipeline(
503+
queries=[
504+
(
505+
"SELECT username FROM users WHERE id = $1",
506+
[100],
507+
),
508+
(
509+
"SELECT some_data FROM profiles",
510+
None,
511+
),
512+
(
513+
"INSERT INTO users (username, id) VALUES ($1, $2)",
514+
["PSQLPy", 1],
515+
),
516+
]
517+
)
518+
519+
```
520+
""" # noqa: E501
461521
async def savepoint(self: Self, savepoint_name: str) -> None:
462522
"""Create new savepoint.
463523

src/driver/transaction.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ use crate::{
99
value_converter::{convert_parameters, PythonDTO},
1010
};
1111
use deadpool_postgres::Object;
12+
use futures_util::future;
1213
use pyo3::{
1314
pyclass, pymethods,
14-
types::{PyList, PyString},
15+
types::{PyList, PyString, PyTuple},
1516
Py, PyAny, PyErr, PyObject, PyRef, PyRefMut, Python,
1617
};
1718
use std::{collections::HashSet, sync::Arc, vec};
@@ -163,6 +164,7 @@ impl RustTransaction {
163164

164165
Ok(())
165166
}
167+
166168
/// Fetch single row from query.
167169
///
168170
/// Method doesn't acquire lock on any structure fields.
@@ -213,6 +215,29 @@ impl RustTransaction {
213215

214216
Ok(PSQLDriverSinglePyQueryResult::new(result))
215217
}
218+
219+
/// Run many queries as pipeline.
220+
///
221+
/// It can boost up querying speed.
222+
///
223+
/// # Errors
224+
///
225+
/// May return Err Result if can't join futures or cannot execute
226+
/// any of queries.
227+
pub async fn inner_pipeline(
228+
&self,
229+
queries: Vec<(String, Vec<PythonDTO>)>,
230+
) -> RustPSQLDriverPyResult<Vec<PSQLDriverPyQueryResult>> {
231+
let mut futures = vec![];
232+
for (querystring, params) in queries {
233+
let execute_future = self.inner_execute(querystring, params);
234+
futures.push(execute_future);
235+
}
236+
237+
let b = future::try_join_all(futures).await?;
238+
Ok(b)
239+
}
240+
216241
/// Start transaction
217242
/// Set up isolation level if specified
218243
/// Set up deferable if specified
@@ -748,6 +773,48 @@ impl Transaction {
748773
})
749774
}
750775

776+
/// Execute querystrings with parameters and return all results.
777+
///
778+
/// Create pipeline of queries.
779+
///
780+
/// # Errors
781+
///
782+
/// May return Err Result if:
783+
/// 1) Cannot convert python parameters
784+
/// 2) Cannot execute any of querystring.
785+
pub fn pipeline<'a>(
786+
&'a self,
787+
py: Python<'a>,
788+
queries: Option<&'a PyList>,
789+
) -> RustPSQLDriverPyResult<&'a PyAny> {
790+
let mut processed_queries: Vec<(String, Vec<PythonDTO>)> = vec![];
791+
if let Some(queries) = queries {
792+
for single_query in queries {
793+
let query_tuple = single_query.downcast::<PyTuple>().map_err(|err| {
794+
RustPSQLDriverError::PyToRustValueConversionError(format!(
795+
"Cannot cast to tuple: {err}",
796+
))
797+
})?;
798+
let querystring = query_tuple.get_item(0)?.extract::<String>()?;
799+
match query_tuple.get_item(1) {
800+
Ok(params) => {
801+
processed_queries.push((querystring, convert_parameters(params)?));
802+
}
803+
Err(_) => {
804+
processed_queries.push((querystring, vec![]));
805+
}
806+
}
807+
}
808+
}
809+
810+
let transaction_arc = self.transaction.clone();
811+
812+
rustengine_future(py, async move {
813+
let transaction_guard = transaction_arc.read().await;
814+
transaction_guard.inner_pipeline(processed_queries).await
815+
})
816+
}
817+
751818
/// Start the transaction.
752819
///
753820
/// # Errors

0 commit comments

Comments
 (0)