diff --git a/fred/cogs/crashes.py b/fred/cogs/crashes.py index 6dc295b..e968fe2 100644 --- a/fred/cogs/crashes.py +++ b/fred/cogs/crashes.py @@ -12,6 +12,8 @@ from zipfile import ZipFile import re2 +import regex as regex_fallback +import re as std_re re2.set_fallback_notification(re2.FALLBACK_WARNING) @@ -47,6 +49,60 @@ async def regex_with_timeout(*args, **kwargs): raise ValueError(args[0]) from e +def pattern_uses_lookaround(pattern: str) -> bool: + return bool(std_re.search(r"\(\?=|\(\?!|\(\?<=|\(\? Generator[str, None, None]: async def mass_regex(self, text: str) -> AsyncIterator[CrashResponse]: for crash in config.Crashes.fetch_all(): - if match := await regex_with_timeout(crash["crash"], text, flags=re2.IGNORECASE | re2.S): + # Use safe_search so patterns with lookaround fall back to the full `regex` package + if match := await safe_search(crash["crash"], text, flags=re2.IGNORECASE | re2.S): if str(crash["response"]).startswith(self.bot.command_prefix): if command := config.Commands.fetch(crash["response"].strip(self.bot.command_prefix)): command_response = command["content"] @@ -188,19 +245,19 @@ async def mass_regex(self, text: str) -> AsyncIterator[CrashResponse]: ) else: - def replace_response_value_with_captured(m: re2.Match) -> str: + def replace_response_value_with_captured(m) -> str: group = int(m.group(1)) if group > len(match.groups()): return f"{{Group {group} not captured in crash regex!}}" return match.group(group) - response = re2.sub(r"{(\d+)}", replace_response_value_with_captured, str(crash["response"])) + response = safe_sub(r"{(\d+)}", replace_response_value_with_captured, str(crash["response"])) yield CrashResponse(name=crash["name"], value=response, inline=True) async def detect_and_fetch_pastebin_content(self, text: str) -> str: - if match := re2.search(r"(https://pastebin.com/\S+)", text): + if match := await safe_search(r"(https://pastebin.com/\S+)", text): self.logger.info("Found a pastebin link! Fetching text.") - url = re2.sub(r"(?<=bin.com)/", "/raw/", match.group(1)) + url = safe_sub(r"(?<=bin.com)/", "/raw/", match.group(1)) async with self.bot.web_session.get(url) as response: return await response.text() else: @@ -215,7 +272,9 @@ async def process_text(self, text: str, filename="") -> list[CrashResponse]: responses.extend(await self.process_text(await self.detect_and_fetch_pastebin_content(text))) - if match := re2.search(r"([^\n]*Critical error:.*Engine exit[^\n]*\))", text, flags=re2.I | re2.M | re2.S): + if match := await safe_search( + r"([^\n]*Critical error:.*Engine exit[^\n]*\))", text, flags=re2.I | re2.M | re2.S + ): filename = os.path.basename(filename) crash = match.group(1) responses.append( @@ -278,7 +337,7 @@ def _ext_filter(ext: str) -> bool: return ext in ("png", "log", "txt", "zip", "json") async def _obtain_attachments(self, message: Message) -> AsyncGenerator[tuple[str, IO | Exception], None, None]: - cdn_links = re2.findall( + cdn_links = safe_findall( r"(https://(?:cdn.discordapp.com|media.discordapp.net)/attachments/\S+)", message.content ) @@ -553,14 +612,14 @@ def _get_fg_log_details(log_file: IO[bytes]): # It used to matter more when we were using slower regex libraries. - Borketh lines: list[bytes] = log_file.readlines() - vanilla_info_search_area = filter(lambda l: re2.match("^LogInit", l), map(lambda b: b.decode(), lines)) + vanilla_info_search_area = filter(lambda l: safe_search_sync("^LogInit", l), map(lambda b: b.decode(), lines)) info = {} patterns = [ - re2.compile(r"Net CL: (?P\d+)"), - re2.compile(r"Command Line:(?P.*)"), - re2.compile(r"Base Directory:(?P.+)"), - re2.compile(r"Launcher ID: (?P\w+)"), + r"Net CL: (?P\d+)", + r"Command Line:(?P.*)", + r"Base Directory:(?P.+)", + r"Launcher ID: (?P\w+)", ] # This loop sequentially finds information, @@ -571,17 +630,19 @@ def _get_fg_log_details(log_file: IO[bytes]): for line in vanilla_info_search_area: if not patterns: break - elif match := re2.search(patterns[0], line): + elif match := safe_search_sync(patterns[0], line): info |= match.groupdict() patterns.pop(0) else: logger.info("Didn't find all four pieces of information normally found in a log!") logger.debug(json.dumps(info, indent=2)) - mod_loader_logs = filter(lambda l: re2.match("LogSatisfactoryModLoader", l), map(lambda b: b.decode(), lines)) + mod_loader_logs = filter( + lambda l: safe_search_sync("LogSatisfactoryModLoader", l), map(lambda b: b.decode(), lines) + ) for line in mod_loader_logs: - if match := re2.search(r"(?<=v\.)(?P[\d.]+)", line): + if match := safe_search_sync(r"(?<=v\.)(?P[\d.]+)", line): info |= match.groupdict() break diff --git a/fred/fred_commands/_command_utils.py b/fred/fred_commands/_command_utils.py index 95f883b..77df6e0 100644 --- a/fred/fred_commands/_command_utils.py +++ b/fred/fred_commands/_command_utils.py @@ -8,7 +8,9 @@ logger = new_logger("[Command/Crash Search]") -def search(table: Type[Commands | Crashes], pattern: str, column: str, force_fuzzy: bool) -> (str | list[str], bool): +def search( + table: Type[Commands | Crashes], pattern: str, column: str, force_fuzzy: bool +) -> tuple[str | list[str], bool]: """Returns the top three results based on the result""" if column not in dir(table):