Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en

## TODO

- [ ] Add **time-based** partition option and validation to `create_partitioned_queue` method.
- [ ] Read(single/batch) Archive Table ( `read_archive` method )
- [ ] Detach Archive Table ( `detach_archive` method )
- [ ] Add `set_vt` utils method.
- [x] Add **time-based** partition option and validation to `create_partitioned_queue` method.
- [x] Read(single/batch) Archive Table ( `read_archive` method )
- [x] Detach Archive Table ( `detach_archive` method )
- [x] Add `set_vt` utils method.
300 changes: 291 additions & 9 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from typing import List, Optional
import re
from typing import List, Optional, Union

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
Expand Down Expand Up @@ -239,11 +240,40 @@ async def _create_partitioned_queue_async(
)
await session.commit()

def _validate_partition_interval(self, interval: Union[int, str]) -> str:
"""Validate partition interval format.

Args:
interval: Either an integer for numeric partitioning or a string for time-based partitioning
(e.g., '1 day', '1 hour', '7 days')

Returns:
The validated interval as a string

Raises:
ValueError: If the interval format is invalid
"""
if isinstance(interval, int):
if interval <= 0:
raise ValueError("Numeric partition interval must be positive")
return str(interval)

# Validate time-based interval format
# Valid PostgreSQL interval formats: '1 day', '7 days', '1 hour', '1 month', etc.
time_pattern = r"^\d+\s+(microsecond|millisecond|second|minute|hour|day|week|month|year)s?$"
if not re.match(time_pattern, interval.strip(), re.IGNORECASE):
raise ValueError(
f"Invalid time-based partition interval: '{interval}'. "
"Expected format: '<number> <unit>' where unit is one of: "
"microsecond, millisecond, second, minute, hour, day, week, month, year"
)
return interval.strip()

def create_partitioned_queue(
self,
queue_name: str,
partition_interval: int = 10000,
retention_interval: int = 100000,
partition_interval: Union[int, str] = 10000,
retention_interval: Union[int, str] = 100000,
) -> None:
"""Create a new **partitioned** queue.

Expand All @@ -252,16 +282,23 @@ def create_partitioned_queue(

.. code-block:: python

# Numeric partitioning (by msg_id)
pgmq_client.create_partitioned_queue('my_partitioned_queue', partition_interval=10000, retention_interval=100000)

# Time-based partitioning (by enqueued_at)
pgmq_client.create_partitioned_queue('my_time_queue', partition_interval='1 day', retention_interval='7 days')

Args:
queue_name (str): The name of the queue, should be less than 48 characters.
partition_interval (int): Will create a new partition every ``partition_interval`` messages.
retention_interval (int): The interval for retaining partitions. Any messages that have a `msg_id` less than ``max(msg_id)`` - ``retention_interval`` will be dropped.
partition_interval (Union[int, str]): For numeric partitioning, the number of messages per partition.
For time-based partitioning, a PostgreSQL interval string (e.g., '1 day', '1 hour').
retention_interval (Union[int, str]): For numeric partitioning, messages with msg_id less than max(msg_id) - retention_interval will be dropped.
For time-based partitioning, a PostgreSQL interval string (e.g., '7 days').

.. note::
| Currently, only support for partitioning by **msg_id**.
| Will add **time-based partitioning** in the future ``pgmq-sqlalchemy`` release.
| Supports both **numeric** (by ``msg_id``) and **time-based** (by ``enqueued_at``) partitioning.
| For time-based partitioning, use interval strings like '1 day', '1 hour', '7 days', etc.
| For numeric partitioning, use integer values.

.. important::
| You must make sure that the ``pg_partman`` extension already **installed** in the Postgres.
Expand All @@ -273,14 +310,24 @@ def create_partitioned_queue(
# check if the pg_partman extension exists before creating a partitioned queue at runtime
self._check_pg_partman_ext()

# Validate partition intervals
validated_partition_interval = self._validate_partition_interval(
partition_interval
)
validated_retention_interval = self._validate_partition_interval(
retention_interval
)

if self.is_async:
return self.loop.run_until_complete(
self._create_partitioned_queue_async(
queue_name, str(partition_interval), str(retention_interval)
queue_name,
validated_partition_interval,
validated_retention_interval,
)
)
return self._create_partitioned_queue_sync(
queue_name, str(partition_interval), str(retention_interval)
queue_name, validated_partition_interval, validated_retention_interval
)

def _validate_queue_name_sync(self, queue_name: str) -> None:
Expand Down Expand Up @@ -1214,3 +1261,238 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
if self.is_async:
return self.loop.run_until_complete(self._metrics_all_async())
return self._metrics_all_sync()

def _set_vt_sync(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
"""Set the visibility timeout of a message synchronously."""
with self.session_maker() as session:
row = session.execute(
text("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);"),
{"queue_name": queue_name, "msg_id": msg_id, "vt": vt},
).fetchone()
session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

async def _set_vt_async(
self, queue_name: str, msg_id: int, vt: int
) -> Optional[Message]:
"""Set the visibility timeout of a message asynchronously."""
async with self.session_maker() as session:
row = (
await session.execute(
text("select * from pgmq.set_vt(:queue_name, :msg_id, :vt);"),
{"queue_name": queue_name, "msg_id": msg_id, "vt": vt},
)
).fetchone()
await session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

def set_vt(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]:
"""
Set the visibility timeout of a message.

Args:
queue_name (str): The name of the queue.
msg_id (int): The message ID.
vt (int): The new visibility timeout in seconds.

Returns:
|schema_message_class|_ or ``None`` if the message does not exist.

Usage:

.. code-block:: python

msg_id = pgmq_client.send('my_queue', {'key': 'value'})
msg = pgmq_client.read('my_queue', vt=10)
# extend the visibility timeout
msg = pgmq_client.set_vt('my_queue', msg_id, 20)
assert msg is not None

"""
if self.is_async:
return self.loop.run_until_complete(
self._set_vt_async(queue_name, msg_id, vt)
)
return self._set_vt_sync(queue_name, msg_id, vt)

def _detach_archive_sync(self, queue_name: str) -> None:
"""Detach the archive table for a queue synchronously."""
with self.session_maker() as session:
session.execute(
text("select pgmq.detach_archive(:queue_name);"),
{"queue_name": queue_name},
)
session.commit()

async def _detach_archive_async(self, queue_name: str) -> None:
"""Detach the archive table for a queue asynchronously."""
async with self.session_maker() as session:
await session.execute(
text("select pgmq.detach_archive(:queue_name);"),
{"queue_name": queue_name},
)
await session.commit()

def detach_archive(self, queue_name: str) -> None:
"""
Detach the archive table for a queue.

* The archive table (``pgmq.a_<queue_name>``) will be detached from the queue.
* The archive table will remain in the database but will no longer be associated with the queue.
* This is useful when you want to keep the archived messages but stop archiving new messages.

.. code-block:: python

pgmq_client.detach_archive('my_queue')

"""
if self.is_async:
return self.loop.run_until_complete(self._detach_archive_async(queue_name))
return self._detach_archive_sync(queue_name)

def _read_archive_sync(self, queue_name: str) -> Optional[Message]:
"""Read a single message from the archive table synchronously."""
with self.session_maker() as session:
row = session.execute(
text(
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
)
).fetchone()
session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

async def _read_archive_async(self, queue_name: str) -> Optional[Message]:
"""Read a single message from the archive table asynchronously."""
async with self.session_maker() as session:
row = (
await session.execute(
text(
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit 1;"
)
)
).fetchone()
await session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

def read_archive(self, queue_name: str) -> Optional[Message]:
"""
Read a single message from the archive table.

Returns:
|schema_message_class|_ or ``None`` if the archive is empty.

Usage:

.. code-block:: python

msg_id = pgmq_client.send('my_queue', {'key': 'value'})
pgmq_client.archive('my_queue', msg_id)
archived_msg = pgmq_client.read_archive('my_queue')
print(archived_msg.message)

"""
# Validate queue name first to prevent SQL injection
self.validate_queue_name(queue_name)
if self.is_async:
return self.loop.run_until_complete(self._read_archive_async(queue_name))
return self._read_archive_sync(queue_name)

def _read_archive_batch_sync(
self, queue_name: str, batch_size: int = 1
) -> Optional[List[Message]]:
"""Read multiple messages from the archive table synchronously."""
with self.session_maker() as session:
rows = session.execute(
text(
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
),
{"batch_size": batch_size},
).fetchall()
session.commit()
if not rows:
return None
return [
Message(
msg_id=row[0],
read_ct=row[1],
enqueued_at=row[2],
vt=row[3],
message=row[4],
)
for row in rows
]

async def _read_archive_batch_async(
self, queue_name: str, batch_size: int = 1
) -> Optional[List[Message]]:
"""Read multiple messages from the archive table asynchronously."""
async with self.session_maker() as session:
rows = (
await session.execute(
text(
f"select msg_id, read_ct, enqueued_at, vt, message from pgmq.a_{queue_name} limit :batch_size;"
),
{"batch_size": batch_size},
)
).fetchall()
await session.commit()
if not rows:
return None
return [
Message(
msg_id=row[0],
read_ct=row[1],
enqueued_at=row[2],
vt=row[3],
message=row[4],
)
for row in rows
]

def read_archive_batch(
self, queue_name: str, batch_size: int = 1
) -> Optional[List[Message]]:
"""
Read multiple messages from the archive table.

Args:
queue_name (str): The name of the queue.
batch_size (int): The number of messages to read.

Returns:
List of |schema_message_class|_ or ``None`` if the archive is empty.

Usage:

.. code-block:: python

msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
pgmq_client.archive_batch('my_queue', msg_ids)
archived_msgs = pgmq_client.read_archive_batch('my_queue', batch_size=10)
for msg in archived_msgs:
print(msg.message)

"""
# Validate queue name first to prevent SQL injection
self.validate_queue_name(queue_name)
if self.is_async:
return self.loop.run_until_complete(
self._read_archive_batch_async(queue_name, batch_size)
)
return self._read_archive_batch_sync(queue_name, batch_size)
Loading