|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import argparse |
| 4 | +import json |
| 5 | +import re |
| 6 | +from io import StringIO |
| 7 | +from pathlib import Path |
| 8 | +from typing import IO, Callable |
| 9 | + |
| 10 | +from dataclasses import dataclass |
| 11 | + |
| 12 | +COMMUNITY_HEADING_PATTERN = r"####\s*Community image" |
| 13 | +PRO_HEADING_PATTERN = r"\n####\s*Pro image" |
| 14 | +API_COVERAGE_HEADING_PATTERN = r"^\s*##\s|$\Z" |
| 15 | +DEFAULT_PAGE_PATH = "src/content/docs/aws/services/cloudformation.mdx" |
| 16 | + |
| 17 | + |
| 18 | +@dataclass |
| 19 | +class ColumnConfig: |
| 20 | + header: str |
| 21 | + key: str |
| 22 | + alignment: str = "left" |
| 23 | + formatter: Callable[[any], str] | None = None |
| 24 | + |
| 25 | + def format_value(self, value: any) -> str: |
| 26 | + if self.formatter: |
| 27 | + return self.formatter(value) |
| 28 | + return str(value) if value is not None else "-" |
| 29 | + |
| 30 | + |
| 31 | +def bool_formatter(value: bool): |
| 32 | + return "✅" if value else "-" |
| 33 | + |
| 34 | + |
| 35 | +@dataclass |
| 36 | +class TableConfig: |
| 37 | + columns: list[ColumnConfig] |
| 38 | + sort_by: str | None = None |
| 39 | + |
| 40 | + def get_headers(self) -> list[str]: |
| 41 | + return [col.header for col in self.columns] |
| 42 | + |
| 43 | + |
| 44 | +class MarkdownTableGenerator: |
| 45 | + def __init__(self, config: TableConfig): |
| 46 | + self.config = config |
| 47 | + |
| 48 | + def _calculate_column_widths(self, data: list[dict[str, any]]) -> list[int]: |
| 49 | + widths = [] |
| 50 | + |
| 51 | + for col in self.config.columns: |
| 52 | + max_width = len(col.header) |
| 53 | + |
| 54 | + for row in data: |
| 55 | + value = row.get(col.key, "") |
| 56 | + formatted_value = col.format_value(value) |
| 57 | + max_width = max(max_width, len(formatted_value)) |
| 58 | + |
| 59 | + widths.append(max_width) |
| 60 | + |
| 61 | + return widths |
| 62 | + |
| 63 | + def _get_alignment_separator(self, alignment: str, width: int) -> str: |
| 64 | + if alignment == "right": |
| 65 | + return f"{'-' * (width + 1)}:" |
| 66 | + elif alignment == "center": |
| 67 | + return f":{'-' * width}:" |
| 68 | + else: |
| 69 | + return f"{'-' * width}" |
| 70 | + |
| 71 | + def _format_cell(self, value: str, width: int, alignment: str) -> str: |
| 72 | + if alignment == "right": |
| 73 | + return value.rjust(width) |
| 74 | + elif alignment == "center": |
| 75 | + return value.center(width) |
| 76 | + else: |
| 77 | + return value.ljust(width) |
| 78 | + |
| 79 | + def _write_header_row(self, writer: IO[str], widths: list[int]) -> None: |
| 80 | + headers = self.config.get_headers() |
| 81 | + formatted_headers = [] |
| 82 | + |
| 83 | + for i, header in enumerate(headers): |
| 84 | + alignment = self.config.columns[i].alignment |
| 85 | + formatted_header = self._format_cell(header, widths[i], alignment) |
| 86 | + formatted_headers.append(formatted_header) |
| 87 | + |
| 88 | + writer.write(f"| {' | '.join(formatted_headers)} |\n") |
| 89 | + |
| 90 | + def _write_separator_row(self, writer: IO[str], widths: list[int]) -> None: |
| 91 | + separators = [] |
| 92 | + for i, col in enumerate(self.config.columns): |
| 93 | + separator = self._get_alignment_separator(col.alignment, widths[i]) |
| 94 | + separators.append(separator) |
| 95 | + |
| 96 | + writer.write(f"|{'|'.join(separators)}|\n") |
| 97 | + |
| 98 | + def _write_data_rows( |
| 99 | + self, writer: IO[str], data: list[Dict[str, Any]], widths: list[int] |
| 100 | + ) -> None: |
| 101 | + if self.config.sort_by: |
| 102 | + data = sorted(data, key=lambda x: x.get(self.config.sort_by, "")) |
| 103 | + |
| 104 | + for row in data: |
| 105 | + formatted_cells = [] |
| 106 | + |
| 107 | + for i, col in enumerate(self.config.columns): |
| 108 | + value = row.get(col.key, "") |
| 109 | + formatted_value = col.format_value(value) |
| 110 | + formatted_cell = self._format_cell( |
| 111 | + formatted_value, widths[i], col.alignment |
| 112 | + ) |
| 113 | + formatted_cells.append(formatted_cell) |
| 114 | + |
| 115 | + writer.write(f"| {' | '.join(formatted_cells)} |\n") |
| 116 | + |
| 117 | + def generate_table(self, data: list[dict[str, any]] | None) -> str: |
| 118 | + if not data: |
| 119 | + return "" |
| 120 | + |
| 121 | + buffer = StringIO() |
| 122 | + widths = self._calculate_column_widths(data) |
| 123 | + |
| 124 | + self._write_header_row(buffer, widths) |
| 125 | + self._write_separator_row(buffer, widths) |
| 126 | + self._write_data_rows(buffer, data, widths) |
| 127 | + |
| 128 | + table = buffer.getvalue().rstrip("\n") + "\n" |
| 129 | + return table |
| 130 | + |
| 131 | +class CloudFormationDataTransformer(): |
| 132 | + def transform(self, section_data: dict[str, any] | None) -> list[dict[str, any]]: |
| 133 | + if not section_data: |
| 134 | + return [] |
| 135 | + |
| 136 | + rows = [] |
| 137 | + |
| 138 | + for resource_type, metadata in section_data.items(): |
| 139 | + methods = set(metadata.get("methods", [])) |
| 140 | + |
| 141 | + row = { |
| 142 | + "resource": resource_type, |
| 143 | + "create": "Create" in methods, |
| 144 | + "delete": "Delete" in methods, |
| 145 | + "update": "Update" in methods, |
| 146 | + } |
| 147 | + rows.append(row) |
| 148 | + return rows |
| 149 | + |
| 150 | + |
| 151 | +def create_argument_parser() -> argparse.ArgumentParser: |
| 152 | + parser = argparse.ArgumentParser( |
| 153 | + description="Update CloudFormation Resources tables in docs" |
| 154 | + ) |
| 155 | + parser.add_argument( |
| 156 | + "--cfn-json", |
| 157 | + required=True, |
| 158 | + type=Path, |
| 159 | + help="Path to iac-catalog-assets/cfn_resources.json in downloaded artifacts", |
| 160 | + ) |
| 161 | + parser.add_argument( |
| 162 | + "--md-file", |
| 163 | + required=False, |
| 164 | + type=Path, |
| 165 | + default=str(DEFAULT_PAGE_PATH), |
| 166 | + help="Markdown file which needs to be updated", |
| 167 | + ) |
| 168 | + |
| 169 | + return parser |
| 170 | + |
| 171 | + |
| 172 | +def _load_cfn_file(cfn_file_path: Path) -> dict[str, any]: |
| 173 | + try: |
| 174 | + with cfn_file_path.open("r", encoding="utf-8") as f: |
| 175 | + return json.load(f) |
| 176 | + except json.JSONDecodeError as e: |
| 177 | + raise ValueError(f"Invalid JSON in cfn json file: {e}") |
| 178 | + |
| 179 | + |
| 180 | +def replace_content_between( |
| 181 | + content: str, |
| 182 | + starting_rx: str, |
| 183 | + ending_rx: str, |
| 184 | + replacement_block: str, |
| 185 | +) -> str: |
| 186 | + # Build a regex that replaces the content between two headings starting_rx and ending_rx. |
| 187 | + # Group1 - start heading |
| 188 | + # Group2 - content; lookahead preserves end boundary. |
| 189 | + pattern = re.compile( |
| 190 | + rf"(^{starting_rx}\s*\n)(.*?)(?={ending_rx})", |
| 191 | + re.DOTALL | re.MULTILINE, |
| 192 | + ) |
| 193 | + |
| 194 | + match = pattern.search(content) |
| 195 | + if not match: |
| 196 | + raise ValueError( |
| 197 | + f"Could not find section with heading pattern: {starting_rx!r}" |
| 198 | + ) |
| 199 | + |
| 200 | + heading = match.group(1) |
| 201 | + replacement = f"{heading}{replacement_block}" if replacement_block else heading |
| 202 | + |
| 203 | + return pattern.sub(replacement, content, count=1) |
| 204 | + |
| 205 | + |
| 206 | +def main(): |
| 207 | + parser = create_argument_parser() |
| 208 | + args = parser.parse_args() |
| 209 | + |
| 210 | + table_config = TableConfig( |
| 211 | + columns=[ |
| 212 | + ColumnConfig("Resource", "resource", "left"), |
| 213 | + ColumnConfig("Create", "create", "right", bool_formatter), |
| 214 | + ColumnConfig("Delete", "delete", "right", bool_formatter), |
| 215 | + ColumnConfig("Update", "update", "right", bool_formatter), |
| 216 | + ] |
| 217 | + ) |
| 218 | + |
| 219 | + table_generator = MarkdownTableGenerator(table_config) |
| 220 | + data_transformer = CloudFormationDataTransformer() |
| 221 | + |
| 222 | + cfn_catalog = _load_cfn_file(args.cfn_json) |
| 223 | + |
| 224 | + community_data = data_transformer.transform(cfn_catalog.get("community")) |
| 225 | + pro_data = data_transformer.transform(cfn_catalog.get("pro")) |
| 226 | + |
| 227 | + community_table = table_generator.generate_table(community_data) |
| 228 | + pro_table = table_generator.generate_table(pro_data) |
| 229 | + |
| 230 | + original_doc = args.md_file.read_text(encoding="utf-8") |
| 231 | + updated_doc = original_doc |
| 232 | + |
| 233 | + updated_doc = replace_content_between( |
| 234 | + content=original_doc, |
| 235 | + starting_rx=COMMUNITY_HEADING_PATTERN, |
| 236 | + ending_rx=PRO_HEADING_PATTERN, |
| 237 | + replacement_block=community_table, |
| 238 | + ) |
| 239 | + |
| 240 | + updated_doc = replace_content_between( |
| 241 | + content=updated_doc, |
| 242 | + starting_rx=PRO_HEADING_PATTERN, |
| 243 | + ending_rx=API_COVERAGE_HEADING_PATTERN, |
| 244 | + replacement_block=pro_table, |
| 245 | + ) |
| 246 | + |
| 247 | + if updated_doc != original_doc: |
| 248 | + args.md_file.write_text(updated_doc) |
| 249 | + |
| 250 | + |
| 251 | +if __name__ == "__main__": |
| 252 | + main() |
0 commit comments