Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:


"""
if vt is None:
vt = self.vt
if self.is_async:
return self.loop.run_until_complete(self._read_async(queue_name, vt))
return self._read_sync(queue_name, vt)
Expand Down Expand Up @@ -812,6 +814,108 @@ def read_with_poll(
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
)

def _set_vt_sync(
self, queue_name: str, msg_id: int, vt_offset: int
) -> Optional[Message]:
"""Set the visibility timeout for a message."""
with self.session_maker() as session:
row = session.execute(
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
).fetchone()
session.commit()
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

async def _set_vt_async(
self, queue_name: str, msg_id: int, vt_offset: int
) -> Optional[Message]:
"""Set the visibility timeout for a message."""
async with self.session_maker() as session:
row = (
await session.execute(
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
{
"queue_name": queue_name,
"msg_id": msg_id,
"vt_offset": vt_offset,
},
)
).fetchone()
await session.commit()
print("row", row)
if row is None:
return None
return Message(
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
)

def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
"""
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`

Set the visibility timeout for a message.

Args:
queue_name (str): The name of the queue.
msg_id (int): The message id.
vt_offset (int): The visibility timeout in seconds.

Returns:
|schema_message_class|_ or ``None`` if the message does not exist.

Usage:

.. code-block:: python

msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
msg = pgmq_client.read('my_queue')
assert msg is not None
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
assert msg is not None

.. tip::
| |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism.
| `ref: Exponential Backoff And Jitter <https://aws.amazon.com/tw/blogs/architecture/exponential-backoff-and-jitter/>`_.
| **For example:**

.. code-block:: python

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

def _exp_backoff_retry(msg: Message)->int:
# exponential backoff retry
if msg.read_ct < 5:
return 2 ** msg.read_ct
return 2 ** 5

def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
msg = pgmq_client.read(
queue_name=queue_name,
vt=1000, # set vt to 1000 seconds temporarily
)
if msg is None:
return

# set exponential backoff retry
pgmq_client.set_vt(
queue_name=query_name,
msg_id=msg.msg_id,
vt_offset=_exp_backoff_retry(msg)
)

"""
if self.is_async:
return self.loop.run_until_complete(
self._set_vt_async(queue_name, msg_id, vt_offset)
)
return self._set_vt_sync(queue_name, msg_id, vt_offset)

def _pop_sync(self, queue_name: str) -> Optional[Message]:
with self.session_maker() as session:
row = session.execute(
Expand Down
44 changes: 44 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ def test_send_and_read_msg_with_vt_and_delay(pgmq_setup_teardown: PGMQ_WITH_QUEU
assert msg_read.msg_id == msg_id


def test_send_and_read_msg_with_vt_zero(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
"""Test that vt=0 works correctly and message becomes visible immediately."""
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
msg_id: int = pgmq.send(queue_name, msg)
# Read with vt=0 means message should be immediately visible again
msg_read = pgmq.read(queue_name, vt=0)
assert msg_read.message == msg
assert msg_read.msg_id == msg_id
# Message should be visible immediately (no waiting)
msg_read = pgmq.read(queue_name)
assert msg_read.message == msg
assert msg_read.msg_id == msg_id


def test_read_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg_read = pgmq.read(queue_name)
Expand Down Expand Up @@ -242,6 +257,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
assert duration > 1.9


def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
msg_id = pgmq.send(queue_name, msg)
msg_read = pgmq.set_vt(queue_name, msg_id, 2)
assert msg is not None
assert pgmq.read(queue_name) is None
time.sleep(1.5)
assert pgmq.read(queue_name) is None
time.sleep(0.6)
msg_read = pgmq.read(queue_name)
assert msg_read.message == msg


def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
msg_id = pgmq.send(queue_name, msg)
_ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds
assert msg is not None
assert pgmq.read(queue_name) is None
time.sleep(0.5)
assert pgmq.set_vt(queue_name, msg_id, 1) is not None
time.sleep(0.3)
assert pgmq.read(queue_name) is None
time.sleep(0.8)
assert pgmq.read(queue_name) is not None


def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
pgmq, queue_name = pgmq_setup_teardown
msg = MSG
Expand Down