|
1 | | -import unittest |
| 1 | +import datetime |
2 | 2 | from unittest import mock |
3 | 3 |
|
4 | 4 | from dateutil.relativedelta import relativedelta |
5 | 5 |
|
6 | 6 | from bot.constants import Emojis |
7 | 7 | from bot.exts.moderation.slowmode import Slowmode |
| 8 | +from tests.base import RedisTestCase |
8 | 9 | from tests.helpers import MockBot, MockContext, MockTextChannel |
9 | 10 |
|
10 | 11 |
|
11 | | -class SlowmodeTests(unittest.IsolatedAsyncioTestCase): |
| 12 | +class SlowmodeTests(RedisTestCase): |
12 | 13 |
|
13 | 14 | def setUp(self) -> None: |
14 | 15 | self.bot = MockBot() |
@@ -95,6 +96,113 @@ async def test_reset_slowmode_sets_delay_to_zero(self) -> None: |
95 | 96 | self.ctx, text_channel, relativedelta(seconds=0) |
96 | 97 | ) |
97 | 98 |
|
| 99 | + @mock.patch("bot.exts.moderation.slowmode.datetime") |
| 100 | + async def test_set_slowmode_with_duration(self, mock_datetime) -> None: |
| 101 | + """Set slowmode with a duration""" |
| 102 | + mock_datetime.now.return_value = datetime.datetime(2025, 6, 2, 12, 0, 0, tzinfo=datetime.UTC) |
| 103 | + test_cases = ( |
| 104 | + ("python-general", 6, 6000, f"{Emojis.check_mark} The slowmode delay for #python-general is now 6 seconds" |
| 105 | + " and expires in <t:1748871600:R>."), |
| 106 | + ("mod-spam", 5, 600, f"{Emojis.check_mark} The slowmode delay for #mod-spam is now 5 seconds and expires" |
| 107 | + " in <t:1748866200:R>."), |
| 108 | + ("changelog", 12, 7200, f"{Emojis.check_mark} The slowmode delay for #changelog is now 12 seconds and" |
| 109 | + " expires in <t:1748872800:R>.") |
| 110 | + ) |
| 111 | + for channel_name, seconds, duration, result_msg in test_cases: |
| 112 | + with self.subTest( |
| 113 | + channel_mention=channel_name, |
| 114 | + seconds=seconds, |
| 115 | + duration=duration, |
| 116 | + result_msg=result_msg |
| 117 | + ): |
| 118 | + text_channel = MockTextChannel(name=channel_name, slowmode_delay=0) |
| 119 | + await self.cog.set_slowmode( |
| 120 | + self.cog, |
| 121 | + self.ctx, |
| 122 | + text_channel, |
| 123 | + relativedelta(seconds=seconds), |
| 124 | + duration=relativedelta(seconds=duration) |
| 125 | + ) |
| 126 | + text_channel.edit.assert_awaited_once_with(slowmode_delay=float(seconds)) |
| 127 | + self.ctx.send.assert_called_once_with(result_msg) |
| 128 | + self.ctx.reset_mock() |
| 129 | + |
| 130 | + @mock.patch("bot.exts.moderation.slowmode.datetime", wraps=datetime.datetime) |
| 131 | + async def test_callback_scheduled(self, mock_datetime, ): |
| 132 | + """Schedule slowmode to be reverted""" |
| 133 | + mock_now = datetime.datetime(2025, 6, 2, 12, 0, 0, tzinfo=datetime.UTC) |
| 134 | + mock_datetime.now.return_value = mock_now |
| 135 | + self.cog.scheduler=mock.MagicMock() |
| 136 | + |
| 137 | + text_channel = MockTextChannel(name="python-general", slowmode_delay=2, id=123) |
| 138 | + await self.cog.set_slowmode( |
| 139 | + self.cog, |
| 140 | + self.ctx, |
| 141 | + text_channel, |
| 142 | + relativedelta(seconds=4), |
| 143 | + relativedelta(seconds=10)) |
| 144 | + |
| 145 | + args = (mock_now+relativedelta(seconds=10), text_channel.id, mock.ANY) |
| 146 | + self.cog.scheduler.schedule_at.assert_called_once_with(*args) |
| 147 | + |
| 148 | + async def test_revert_slowmode_callback(self) -> None: |
| 149 | + """Check that the slowmode is reverted""" |
| 150 | + text_channel = MockTextChannel(name="python-general", slowmode_delay=2, id=123) |
| 151 | + self.bot.get_channel = mock.MagicMock(return_value=text_channel) |
| 152 | + await self.cog.set_slowmode( |
| 153 | + self.cog, self.ctx, text_channel, relativedelta(seconds=4), relativedelta(seconds=10) |
| 154 | + ) |
| 155 | + await self.cog._revert_slowmode(text_channel.id) |
| 156 | + text_channel.edit.assert_awaited_with(slowmode_delay=2) |
| 157 | + text_channel.send.assert_called_once_with( |
| 158 | + f"{Emojis.check_mark} A previously applied slowmode has expired and has been reverted to 2 seconds." |
| 159 | + ) |
| 160 | + |
| 161 | + async def test_reschedule_slowmodes(self) -> None: |
| 162 | + """Does not reschedule if cache is empty""" |
| 163 | + self.cog.scheduler.schedule_at = mock.MagicMock() |
| 164 | + self.cog._reschedule = mock.AsyncMock() |
| 165 | + await self.cog.cog_unload() |
| 166 | + await self.cog.cog_load() |
| 167 | + |
| 168 | + self.cog._reschedule.assert_called() |
| 169 | + self.cog.scheduler.schedule_at.assert_not_called() |
| 170 | + |
| 171 | + |
| 172 | + @mock.patch("bot.exts.moderation.slowmode.datetime", wraps=datetime.datetime) |
| 173 | + async def test_reschedules_slowmodes(self, mock_datetime) -> None: |
| 174 | + """Slowmodes are loaded from cache at cog reload and scheduled to be reverted.""" |
| 175 | + mock_datetime.now.return_value = datetime.datetime(2025, 6, 2, 12, 0, 0, tzinfo=datetime.UTC) |
| 176 | + mock_now = datetime.datetime(2025, 6, 2, 12, 0, 0, tzinfo=datetime.UTC) |
| 177 | + self.cog._reschedule = mock.AsyncMock(wraps=self.cog._reschedule) |
| 178 | + |
| 179 | + channels = [] |
| 180 | + slowmodes = ( |
| 181 | + (123, (mock_now - datetime.timedelta(10)).timestamp(), 2), # expiration in the past |
| 182 | + (456, (mock_now + datetime.timedelta(10)).timestamp(), 4), # expiration in the future |
| 183 | + ) |
| 184 | + |
| 185 | + for channel_id, expiration_datetime, delay in slowmodes: |
| 186 | + channel = MockTextChannel(slowmode_delay=delay, id=channel_id) |
| 187 | + channels.append(channel) |
| 188 | + |
| 189 | + await self.cog.slowmode_expiration_cache.set(channel_id, expiration_datetime) |
| 190 | + await self.cog.original_slowmode_cache.set(channel_id, delay) |
| 191 | + |
| 192 | + await self.cog.cog_unload() |
| 193 | + await self.cog.cog_load() |
| 194 | + |
| 195 | + # check that _reschedule function was called upon cog reload. |
| 196 | + self.cog._reschedule.assert_called() |
| 197 | + |
| 198 | + # check that a task was created for every cached slowmode. |
| 199 | + for channel in channels: |
| 200 | + self.assertIn(channel.id, self.cog.scheduler) |
| 201 | + |
| 202 | + # check that one channel with slowmode expiration in the past was edited immediately. |
| 203 | + channels[0].edit.assert_awaited_once_with(slowmode_delay=channels[0].slowmode_delay) |
| 204 | + channels[1].edit.assert_not_called() |
| 205 | + |
98 | 206 | @mock.patch("bot.exts.moderation.slowmode.has_any_role") |
99 | 207 | @mock.patch("bot.exts.moderation.slowmode.MODERATION_ROLES", new=(1, 2, 3)) |
100 | 208 | async def test_cog_check(self, role_check): |
|
0 commit comments