Skip to content

Commit c179a15

Browse files
authored
Merge pull request #19 from codefortulsa/refactor-meetings-task
Refactor meeting ingestion pipeline and add registry support
2 parents b91af41 + 54ff29d commit c179a15

File tree

6 files changed

+217
-135
lines changed

6 files changed

+217
-135
lines changed

flows/translate_meetings.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
from prefect import flow
22

3-
from tasks.meetings import create_meetings_csv
3+
from tasks.meetings import get_new_meetings
44

55

66
@flow(log_prints=True)
77
async def translate_meetings():
8-
await create_meetings_csv()
9-
# TODO: await download_videos()
10-
# TODO: await transcribe_videos()
11-
# TODO: await diarize_transcriptions()
12-
# TODO: await translate_transcriptions()
13-
# TODO: await create_subtitled_video_pages()
8+
new_meetings = await get_new_meetings()
9+
# new_transcribed_meetings = await transcribe_videos(new_meetings)
10+
# new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings)
11+
# new_translated_meetings = await translate_transcriptions(new_transcribed_meetings)
1412

1513
if __name__ == "__main__":
1614
import asyncio

src/meetings.py

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,22 @@
66
Television websites.
77
"""
88

9-
from typing import Dict, List
9+
import re
10+
from typing import Dict, List, Sequence
1011
from urllib.parse import urljoin
1112

1213
import aiohttp
1314
import pandas as pd
1415
from selectolax.parser import HTMLParser
1516

17+
from src.aws import is_aws_configured
18+
from src.models.utils import from_jsonl, to_jsonl
19+
1620
from .models.meeting import Meeting
1721

1822
BASE_URL = "https://tulsa-ok.granicus.com/ViewPublisher.php?view_id=4"
23+
TGOV_BUCKET_NAME = "tgov-meetings"
24+
MEETINGS_REGISTRY_PATH = "data/meetings.jsonl"
1925

2026

2127
async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
@@ -35,6 +41,10 @@ async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
3541
return await response.text()
3642

3743

44+
def clean_date(date: str) -> str:
45+
return re.sub(r"\s+", " ", date).strip()
46+
47+
3848
async def parse_meetings(html: str) -> List[Dict[str, str]]:
3949
"""
4050
Parse the meeting data from the HTML content.
@@ -56,76 +66,73 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
5666

5767
# Process each table
5868
for table in tables:
59-
# Find the tbody section which contains the actual meeting rows
60-
tbody = table.css_first("tbody")
61-
if not tbody:
62-
continue
63-
64-
# Process each row in the tbody
65-
for row in tbody.css("tr"):
69+
for row in table.css("tr.listingRow"):
6670
cells = row.css("td")
67-
if len(cells) < 5:
68-
continue
71+
name_cells = row.css('td.listItem[headers^="Name"]')
72+
meeting_name = name_cells[0].text().strip() if name_cells else "Unknown"
73+
74+
date_cells = row.css('td.listItem[headers^="Date"]')
75+
raw_date = clean_date(date_cells[0].text().strip()) if date_cells else "Unknown"
76+
meeting_date = raw_date.split("-")[0].strip() if "-" in raw_date else raw_date
77+
78+
79+
duration_cells = row.css('td.listItem[headers^="Duration"]')
80+
duration_str = duration_cells[0].text().strip() if duration_cells else "Unknown"
81+
minutes = duration_to_minutes(duration_str)
82+
meeting_duration = f"{minutes // 60}:{minutes % 60:02d}" if minutes is not None else "Unknown"
83+
6984

7085
meeting_data = {
71-
"meeting": cells[0].text().strip(),
72-
"date": cells[1].text().strip(),
73-
"duration": cells[2].text().strip(),
86+
"meeting": meeting_name,
87+
"date": meeting_date,
88+
"duration": meeting_duration,
7489
"agenda": None,
90+
"clip_id": None,
7591
"video": None,
7692
}
7793

7894
# Extract agenda link if available
79-
agenda_cell = cells[3]
80-
agenda_link = agenda_cell.css_first("a")
81-
if agenda_link and agenda_link.attributes.get("href"):
95+
agenda_cells = row.css('td.listItem:has(a[href*="AgendaViewer.php"])')
96+
agenda_link = agenda_cells[0].css_first("a") if agenda_cells else None
97+
if agenda_link is not None:
8298
meeting_data["agenda"] = urljoin(
8399
BASE_URL, agenda_link.attributes.get("href")
84100
)
85101

86102
# Extract video link if available
87-
video_cell = cells[4]
88-
video_link = video_cell.css_first("a")
89-
if video_link:
90-
# First try to extract from onclick attribute
103+
video_cells = row.css('td.listItem[headers^="VideoLink"]')
104+
video_cell = video_cells[0] if video_cells else None
105+
if video_cell is not None:
106+
video_link = video_cell.css_first("a")
107+
91108
onclick = video_link.attributes.get("onclick", "")
92-
if onclick:
93-
# Look for window.open pattern
94-
if "window.open(" in onclick:
95-
# Extract URL from window.open('URL', ...)
96-
start_quote = onclick.find("'", onclick.find("window.open("))
97-
end_quote = onclick.find("'", start_quote + 1)
98-
if start_quote > 0 and end_quote > start_quote:
99-
video_url = onclick[start_quote + 1 : end_quote]
100-
# Handle protocol-relative URLs (starting with //)
101-
if video_url.startswith("//"):
102-
video_url = f"https:{video_url}"
103-
meeting_data["video"] = video_url
104-
105-
# If onclick extraction failed, try href
106-
if meeting_data["video"] is None and video_link.attributes.get("href"):
107-
href = video_link.attributes.get("href")
108-
# Handle javascript: hrefs
109-
if href.startswith("javascript:"):
110-
# Try to extract clip_id from the onclick attribute again
111-
# This handles cases where href is javascript:void(0) but onclick has the real URL
112-
if meeting_data["video"] is None and "clip_id=" in onclick:
113-
start_idx = onclick.find("clip_id=")
114-
end_idx = onclick.find("'", start_idx)
115-
if start_idx > 0 and end_idx > start_idx:
116-
clip_id = onclick[start_idx + 8 : end_idx]
117-
meeting_data["video"] = (
118-
f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
119-
)
109+
onclick_match = re.search(r"window\.open\(['\"](//[^'\"]+)['\"]", onclick)
110+
clip_id_exp = r"clip_id=(\d+)"
111+
112+
if onclick_match:
113+
meeting_data["video"] = f"https:{onclick_match.group(1)}"
114+
clip_id_match = re.search(clip_id_exp, onclick)
115+
if clip_id_match:
116+
meeting_data["clip_id"] = clip_id_match.group(1)
120117
else:
121-
meeting_data["video"] = urljoin(BASE_URL, href)
118+
meeting_data["clip_id"] = None
119+
if not meeting_data["video"]:
120+
href = video_link.attributes.get("href", "")
121+
if href.startswith("javascript:"):
122+
clip_id_match = re.search(clip_id_exp, href)
123+
if clip_id_match:
124+
clip_id = clip_id_match.group(1)
125+
meeting_data["clip_id"] = clip_id
126+
meeting_data["video"] = f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
127+
else:
128+
meeting_data["video"] = urljoin(BASE_URL, href)
122129

123130
meetings.append(meeting_data)
124131

125132
return meetings
126133

127134

128-
async def get_meetings() -> List[Meeting]:
135+
async def get_tgov_meetings() -> Sequence[Meeting]:
129136
"""
130137
Fetch and parse meeting data from the Government Access Television website.
131138
@@ -164,3 +171,44 @@ def duration_to_minutes(duration):
164171
return hours * 60 + minutes
165172
except:
166173
return None
174+
175+
176+
def get_registry_meetings() -> Sequence[Meeting]:
177+
if is_aws_configured():
178+
print(f'Getting registry from AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
179+
import boto3
180+
from botocore.exceptions import ClientError
181+
s3 = boto3.client('s3')
182+
try:
183+
registry_response = s3.get_object(Bucket=TGOV_BUCKET_NAME, Key=MEETINGS_REGISTRY_PATH)
184+
registry_body = registry_response['Body'].read().decode('utf-8')
185+
return from_jsonl(registry_body, Meeting)
186+
except ClientError as e:
187+
if e.response['Error']['Code'] == 'NoSuchKey':
188+
print('No registry file found on S3. Returning empty list.')
189+
190+
return []
191+
192+
193+
def write_registry_meetings(meetings: Sequence[Meeting]) -> Sequence[Meeting]:
194+
jsonl_str = to_jsonl(meetings)
195+
196+
if is_aws_configured():
197+
print(f'Writing registry to AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
198+
import boto3
199+
from botocore.exceptions import ClientError
200+
s3 = boto3.client('s3')
201+
202+
try:
203+
s3.put_object(
204+
Bucket=TGOV_BUCKET_NAME,
205+
Key=MEETINGS_REGISTRY_PATH,
206+
Body=jsonl_str,
207+
ContentType='application/x-ndjson'
208+
)
209+
print(f'Wrote {len(meetings)} meetings to S3.')
210+
except ClientError as e:
211+
print(f"Failed to write to S3: {e}")
212+
raise
213+
214+
return meetings

src/models/meeting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Pydantic models for meeting data
33
"""
44

5-
from datetime import datetime
65
from typing import Optional
76

87
from pydantic import BaseModel, Field, HttpUrl
@@ -18,6 +17,7 @@ class Meeting(BaseModel):
1817
duration: str = Field(description="Duration of the meeting")
1918
agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda")
2019
video: Optional[HttpUrl] = Field(None, description="URL to the meeting video")
20+
clip_id: Optional[str] = Field(None, description="Granicus clip ID")
2121

2222
def __str__(self) -> str:
2323
"""String representation of the meeting"""

src/models/utils.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import json
2+
from typing import Sequence, Type, TypeVar
3+
4+
5+
from pydantic import BaseModel
6+
7+
8+
T = TypeVar("T", bound=BaseModel)
9+
10+
11+
def to_jsonl(models: Sequence[T]) -> str:
12+
"""
13+
Serialize a list of Pydantic models to a JSONL (JSON Lines) formatted string.
14+
15+
Each model is serialized to a single line of JSON using `model_dump_json()`.
16+
17+
Args:
18+
models: A list of Pydantic BaseModel instances.
19+
20+
Returns:
21+
A JSONL-formatted string with one model per line.
22+
"""
23+
return "\n".join(model.model_dump_json() for model in models)
24+
25+
26+
def from_jsonl(jsonl_str: str, model_class: Type[T]) -> Sequence[T]:
27+
"""
28+
Deserialize a JSONL string into a list of Pydantic model instances.
29+
30+
Args:
31+
jsonl_str: The JSON Lines string to parse.
32+
model_class: The Pydantic model class to use for validation.
33+
34+
Returns:
35+
A list of instances of the specified Pydantic model class.
36+
"""
37+
return [model_class.model_validate(json.loads(line)) for line in jsonl_str.strip().splitlines()]

tasks/meetings.py

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1-
import os
2-
3-
import pandas as pd
1+
from typing import Sequence
42
from prefect import task
53

6-
from src.aws import create_bucket_if_not_exists, is_aws_configured, upload_to_s3
7-
from src.meetings import duration_to_minutes, get_meetings
8-
4+
from src.meetings import get_tgov_meetings, get_registry_meetings, write_registry_meetings
5+
from src.models.meeting import Meeting
96

10-
file_path = 'data/meetings.csv' # Path where the file will be saved locally temporarily
11-
meetings_bucket_name = 'tgov-meetings'
127

138
@task
14-
async def create_meetings_csv():
15-
meetings = await get_meetings()
16-
print(f"Got meetings: {meetings}")
17-
meeting_dicts = [meeting.model_dump() for meeting in meetings]
18-
print(f"meeting_dicts: {meeting_dicts}")
19-
df = pd.DataFrame(meeting_dicts)
20-
df['duration_minutes'] = df['duration'].apply(duration_to_minutes)
21-
df.to_csv(file_path, index=False)
22-
23-
if is_aws_configured():
24-
print(f"file_path: {file_path}")
25-
create_bucket_if_not_exists(meetings_bucket_name)
26-
if not upload_to_s3(file_path, meetings_bucket_name, file_path):
27-
raise RuntimeError("Failed to upload to S3")
28-
os.remove(file_path) # Remove local file after successful upload
29-
else:
30-
output_path = 'meetings.csv' # Local path if AWS is not configured
31-
df.to_csv(output_path, index=False)
9+
async def get_new_meetings():
10+
# TODO: accept max_limit parameter
11+
tgov_meetings: Sequence[Meeting] = await get_tgov_meetings()
12+
print(f"Got {len(tgov_meetings)} tgov meetings.")
13+
tgov_clip_ids = [tm.clip_id for tm in tgov_meetings]
14+
# print(f"tgov_clip_ids: {tgov_clip_ids}")
15+
16+
registry_meetings: Sequence[Meeting] = get_registry_meetings()
17+
print(f"Got {len(registry_meetings)} registry meetings.")
18+
19+
registry_clip_ids = [rm.clip_id for rm in registry_meetings]
20+
21+
new_meetings: Sequence[Meeting] = [tm for tm in tgov_meetings if tm.clip_id not in registry_clip_ids]
22+
23+
if new_meetings:
24+
registry_meetings += new_meetings
25+
write_registry_meetings(registry_meetings)
26+
return new_meetings
27+
28+
print(f"No new meetings. {len(registry_meetings)} in registry.")
29+
return []

0 commit comments

Comments
 (0)