@@ -263,9 +263,6 @@ def create_partitioned_queue(
263263 | Currently, only support for partitioning by **msg_id**.
264264 | Will add **time-based partitioning** in the future ``pgmq-sqlalchemy`` release.
265265
266- Todo:
267- * Add time-based partitioning with validation.
268-
269266 .. important::
270267 | You must make sure that the ``pg_partman`` extension already **installed** in the Postgres.
271268 | ``pgmq-sqlalchemy`` will **auto create** the ``pg_partman`` extension if it does not exist in the Postgres.
@@ -337,11 +334,17 @@ async def _drop_queue_async(self, queue: str, partitioned: bool = False) -> bool
337334 def drop_queue (self , queue : str , partitioned : bool = False ) -> bool :
338335 """Drop a queue.
339336
337+ .. _drop_queue_anchor:
338+
340339 .. code-block:: python
341340
342341 pgmq_client.drop_queue('my_queue')
343342 # for partitioned queue
344343 pgmq_client.drop_queue('my_partitioned_queue', partitioned=True)
344+
345+ .. warning::
346+ | All messages and queue itself will be deleted. (``pgmq.q_<queue_name>`` table)
347+ | **Archived tables** (``pgmq.a_<queue_name>`` table **will be dropped as well. )**
345348 """
346349 # check if the pg_partman extension exists before dropping a partitioned queue at runtime
347350 if partitioned :
@@ -946,7 +949,23 @@ async def _archive_async(self, queue_name: str, msg_id: int) -> bool:
946949 return row [0 ]
947950
948951 def archive (self , queue_name : str , msg_id : int ) -> bool :
949- """Archive a message from a queue."""
952+ """
953+ Archive a message from a queue.
954+
955+
956+ * Message will be deleted from the queue and moved to the archive table.
957+ * Will be deleted from ``pgmq.q_<queue_name>`` and be inserted into the ``pgmq.a_<queue_name>`` table.
958+ * raises an error if the ``queue_name`` does not exist.
959+ * returns ``True`` if the message is archived successfully.
960+
961+ .. code-block:: python
962+
963+ msg_id = pgmq_client.send('my_queue', {'key': 'value'})
964+ assert pgmq_client.archive('my_queue', msg_id)
965+ # since the message is archived, queue will be empty
966+ assert pgmq_client.read('my_queue') is None
967+
968+ """
950969 if self .is_async :
951970 return self .loop .run_until_complete (self ._archive_async (queue_name , msg_id ))
952971 return self ._archive_sync (queue_name , msg_id )
@@ -974,7 +993,19 @@ async def _archive_batch_async(
974993 return [row [0 ] for row in rows ]
975994
976995 def archive_batch (self , queue_name : str , msg_ids : List [int ]) -> List [int ]:
977- """Archive multiple messages from a queue."""
996+ """
997+ Archive multiple messages from a queue.
998+
999+ * Messages will be deleted from the queue and moved to the archive table.
1000+ * Returns a list of ``msg_ids`` that are successfully archived.
1001+
1002+ .. code-block:: python
1003+
1004+ msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
1005+ assert pgmq_client.archive_batch('my_queue', msg_ids) == msg_ids
1006+ assert pgmq_client.read('my_queue') is None
1007+
1008+ """
9781009 if self .is_async :
9791010 return self .loop .run_until_complete (
9801011 self ._archive_batch_async (queue_name , msg_ids )
@@ -1004,7 +1035,17 @@ async def _purge_async(self, queue_name: str) -> int:
10041035 return row [0 ]
10051036
10061037 def purge (self , queue_name : str ) -> int :
1007- """Purge a queue,return deleted_count."""
1038+ """
1039+ * Delete all messages from a queue, return the number of messages deleted.
1040+ * Archive tables will **not** be affected.
1041+
1042+ .. code-block:: python
1043+
1044+ msg_ids = pgmq_client.send_batch('my_queue', [{'key': 'value'}, {'key': 'value'}])
1045+ assert pgmq_client.purge('my_queue') == 2
1046+ assert pgmq_client.read('my_queue') is None
1047+
1048+ """
10081049 if self .is_async :
10091050 return self .loop .run_until_complete (self ._purge_async (queue_name ))
10101051 return self ._purge_sync (queue_name )
@@ -1047,7 +1088,27 @@ async def _metrics_async(self, queue_name: str) -> Optional[QueueMetrics]:
10471088 )
10481089
10491090 def metrics (self , queue_name : str ) -> Optional [QueueMetrics ]:
1050- """Get queue metrics."""
1091+ """
1092+ Get metrics for a queue.
1093+
1094+ .. _schema_message_class: `schema.Message`_
1095+ .. |schema_message_class| replace:: :py:class:`.~pgmq_sqlalchemy.schema.QueueMetrics`
1096+
1097+ Returns:
1098+ |schema_message_class|_
1099+
1100+ Usage:
1101+
1102+ .. code-block:: python
1103+
1104+ from pgmq_sqlalchemy.schema import QueueMetrics
1105+
1106+ metrics:QueueMetrics = pgmq_client.metrics('my_queue')
1107+ print(metrics.queue_name)
1108+ print(metrics.queue_length)
1109+ print(metrics.queue_length)
1110+
1111+ """
10511112 if self .is_async :
10521113 return self .loop .run_until_complete (self ._metrics_async (queue_name ))
10531114 return self ._metrics_sync (queue_name )
@@ -1089,7 +1150,40 @@ async def _metrics_all_async(self) -> Optional[List[QueueMetrics]]:
10891150 ]
10901151
10911152 def metrics_all (self ) -> Optional [List [QueueMetrics ]]:
1092- """Get metrics for all queues."""
1153+ """
1154+
1155+ .. _read_committed_isolation_level: https://www.postgresql.org/docs/current/transaction-iso.html#XACT-READ-COMMITTED
1156+ .. |read_committed_isolation_level| replace:: **READ COMMITTED**
1157+
1158+ .. _drop_queue_method: ref:`pgmq_sqlalchemy.PGMQueue.drop_queue`
1159+ .. |drop_queue_method| replace:: :py:class:`~pgmq_sqlalchemy.PGMQueue.drop_queue`
1160+
1161+ Get metrics for all queues.
1162+
1163+ Returns:
1164+ List of |schema_message_class|_ objects.
1165+
1166+ :py:class:`~pgmq_sqlalchemy.schema.QueueMetrics`
1167+
1168+ Usage:
1169+
1170+ .. code-block:: python
1171+
1172+ from pgmq_sqlalchemy.schema import QueueMetrics
1173+
1174+ metrics:List[QueueMetrics] = pgmq_client.metrics_all()
1175+ for m in metrics:
1176+ print(m.queue_name)
1177+ print(m.queue_length)
1178+ print(m.queue_length)
1179+
1180+ .. warning::
1181+ | You should use a **distributed lock** to avoid **race conditions** when calling :py:meth:`~pgmq_sqlalchemy.metrics_call` in **concurrent** |drop_queue_method|_ **scenarios**.
1182+ |
1183+ | Since the default PostgreSQL isolation level is |read_committed_isolation_level|_, the queue metrics to be fetched **may not exist** if there are **concurrent** |drop_queue_method|_ **operations**.
1184+
1185+
1186+ """
10931187 if self .is_async :
10941188 return self .loop .run_until_complete (self ._metrics_all_async ())
10951189 return self ._metrics_all_sync ()
0 commit comments