diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index bf68001..3d19e52 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -2,12 +2,13 @@ name: Build/run tgov on: push: - branches: [ "main", "deploy-lambda", "test-flows" ] + branches: [ "main", "diarize-flow" ] pull_request: branches: [ "main" ] permissions: - contents: read + id-token: write + contents: read jobs: build: @@ -16,13 +17,15 @@ jobs: env: POETRY_VERSION: "1.3.2" POETRY_VENV: "/opt/poetry-venv" - + S3_BUCKET: ${{ secrets.S3_BUCKET }} + AWS_DEFAULT_REGION: us-east-2 steps: - uses: actions/checkout@v4 - name: Set up Python 3.11 uses: actions/setup-python@v3 with: python-version: "3.11" + - name: Install dependencies run: | set -ex @@ -36,14 +39,14 @@ jobs: /opt/poetry-venv/bin/poetry config virtualenvs.create false /opt/poetry-venv/bin/poetry install /opt/poetry-venv/bin/poetry env info + - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-session-token: ${{ secrets.AWS_SESSION_TOKEN }} + role-to-assume: arn:aws:iam::480103772849:role/GithubCFTRole aws-region: us-east-2 + audience: sts.amazonaws.com - name: Run Diarization run: | - /opt/poetry-venv/bin/python -m flows.translate_meetings + /opt/poetry-venv/bin/python -m flows.transcribe_meetings diff --git a/db/migrate_s3_path.py b/db/migrate_s3_path.py new file mode 100644 index 0000000..243a89c --- /dev/null +++ b/db/migrate_s3_path.py @@ -0,0 +1,31 @@ +""" +Migration script to add s3_path field to existing meeting records. +""" + +from dyntastic import A +from src.models.meeting import Meeting + + +def migrate_s3_path(): + """ + Add s3_path field to all existing meeting records that don't have it. + """ + print("Starting migration to add s3_path field to existing meetings...") + + # Get all meetings + meetings = Meeting.scan() + updated_count = 0 + + for meeting in meetings: + # Check if s3_path field exists and is None + if not hasattr(meeting, "s3_path") or meeting.s3_path is None: + print(f"Updating meeting: {meeting.meeting} ({meeting.date})") + meeting.s3_path = None + meeting.save() + updated_count += 1 + + print(f"Migration complete. Updated {updated_count} meetings.") + + +if __name__ == "__main__": + migrate_s3_path() diff --git a/db/queries.py b/db/queries.py index 98fa026..027ccec 100644 --- a/db/queries.py +++ b/db/queries.py @@ -4,7 +4,9 @@ from src.models.meeting import Meeting -def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]: +def get_meetings( + days: int = 7, video: Optional[bool] = None, s3_path: Optional[bool] = None +) -> List[Meeting]: """ Get meetings that occurred in the past number of days from now. """ @@ -13,6 +15,11 @@ def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]: meetings = Meeting.scan( A.date >= target_date, ) - meetings_list = [m for m in meetings if (video is None or bool(m.video) == video)] + meetings_list = [ + m + for m in meetings + if (video is None or bool(m.video) == video) + and (s3_path is None or bool(m.s3_path) == s3_path) + ] return list(meetings_list) diff --git a/flows/translate_meetings.py b/flows/download_meetings.py similarity index 53% rename from flows/translate_meetings.py rename to flows/download_meetings.py index 9916b59..38fa26e 100644 --- a/flows/translate_meetings.py +++ b/flows/download_meetings.py @@ -1,21 +1,21 @@ from prefect import flow from db.queries import get_meetings -from tasks.diarize import diarize_meeting +from tasks.diarize import download_video_and_put_in_s3 from tasks.meetings import register_meetings -@flow(log_prints=True) -def translate_meetings(): +# @flow(log_prints=True) +def download_meetings(): new_meetings = register_meetings() print(f"Registered {len(new_meetings)} new meetings") - meetings_to_diarize = get_meetings(video=True) - print(f"Found {len(meetings_to_diarize)} meetings to diarize") - for meeting in meetings_to_diarize: - diarize_meeting(meeting) + meetings_to_download = get_meetings(days=7, video=True, s3_path=False) + print(f"Found {len(meetings_to_download)} meetings to download") + for meeting in meetings_to_download: + download_video_and_put_in_s3(meeting) # new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings) # new_translated_meetings = await translate_transcriptions(new_transcribed_meetings) if __name__ == "__main__": - translate_meetings() + download_meetings() diff --git a/flows/transcribe_meetings.py b/flows/transcribe_meetings.py new file mode 100644 index 0000000..6ac9536 --- /dev/null +++ b/flows/transcribe_meetings.py @@ -0,0 +1,21 @@ +from prefect import flow +import os + +from db.queries import get_meetings +from tasks.diarize import diarize_meeting, BUCKET_NAME + + +@flow(log_prints=True) +def transcribe_meetings(): + print(f"S3_BUCKET environment variable: {os.getenv('S3_BUCKET')}") + print(f"BUCKET_NAME from tasks.diarize: {BUCKET_NAME}") + + meetings_to_diarize = get_meetings(video=True, s3_path=True) + print(f"Found {len(meetings_to_diarize)} meetings to diarize") + for meeting in meetings_to_diarize: + print(f"Processing meeting: {meeting.meeting} with s3_path: {meeting.s3_path}") + diarize_meeting(meeting) + + +if __name__ == "__main__": + transcribe_meetings() diff --git a/src/aws.py b/src/aws.py index 29f255f..22b4a33 100644 --- a/src/aws.py +++ b/src/aws.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError @@ -52,3 +53,21 @@ def save_content_to_s3(content, bucket_name, s3_key, content_type): region = s3_client.meta.region_name url = f"https://{bucket_name}.s3.{region}.amazonaws.com/{s3_key}" return HttpUrl(url) + + +def get_video_from_s3(bucket_name, s3_path): + try: + # Create output directory if it doesn't exist + output_dir = Path("data/video") + output_dir.mkdir(parents=True, exist_ok=True) + + # Define output path + output_path = output_dir / Path(s3_path).name + + # Download file from S3 + s3_client.download_file(bucket_name, s3_path, str(output_path)) + print(f"Downloaded {s3_path} from S3 to {output_path}") + return output_path + except ClientError as e: + print(f"Failed to get video from S3: {str(e)}") + return None diff --git a/src/models/meeting.py b/src/models/meeting.py index fee0070..fa6cf2d 100644 --- a/src/models/meeting.py +++ b/src/models/meeting.py @@ -5,7 +5,7 @@ from typing import Optional from dyntastic import Dyntastic -from pydantic import BaseModel, Field, HttpUrl +from pydantic import BaseModel, Field, HttpUrl, ConfigDict from datetime import datetime from typing import List @@ -18,18 +18,23 @@ class Meeting(Dyntastic): __table_name__ = "tgov-meeting" __hash_key__ = "clip_id" + model_config = ConfigDict(extra="ignore") + clip_id: Optional[str] = Field(None, description="Granicus clip ID") meeting: str = Field(description="Name of the meeting") date: datetime = Field(description="Date and time of the meeting") duration: str = Field(description="Duration of the meeting") agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda") video: Optional[HttpUrl] = Field(None, description="URL to the meeting video") - transcripts: Optional[List[HttpUrl]] = Field( + transcripts: Optional[List[str]] = Field( None, description="URLs to the meeting transcripts" ) subtitles: Optional[List[HttpUrl]] = Field( None, description="URLs to the meeting subtitle tracks" ) + s3_path: Optional[str] = Field( + default=None, description="S3 path to the meeting video" + ) def __str__(self) -> str: """String representation of the meeting""" diff --git a/src/run_diarization.py b/src/run_diarization.py index cabf281..13b10e0 100644 --- a/src/run_diarization.py +++ b/src/run_diarization.py @@ -1,10 +1,11 @@ import asyncio import os +import json from pathlib import Path from src.aws import save_content_to_s3 -from src.models.meeting import GranicusPlayerPage +from src.models.meeting import GranicusPlayerPage, Meeting from src.granicus import get_video_player from src.videos import download_file, transcribe_video_with_diarization @@ -38,19 +39,24 @@ def download_video(file_name: str, video_url: str): return video_file -def run_diarization(video_file: Path): +def run_diarization(video_file: Path, meeting: Meeting): transcription_dir = Path("data/transcripts") transcription = asyncio.run( transcribe_video_with_diarization(video_file, transcription_dir) ) # Add transcript to S3 + # Convert dictionary to JSON string before saving + transcription_json = json.dumps(transcription, indent=2, ensure_ascii=False) save_content_to_s3( - transcription, + transcription_json, BUCKET_NAME, f"{FOLDER_NAME}/{video_file.name}.json", "application/json", ) + meeting.transcripts = [f"{FOLDER_NAME}/{video_file.name}.json"] + meeting.save() + print(transcription) diff --git a/src/videos.py b/src/videos.py index 56b9c7f..0186beb 100644 --- a/src/videos.py +++ b/src/videos.py @@ -79,8 +79,6 @@ def download_file(url: str, output_path: Path): ) print(f"Download complete: {url}") - # Add to S3 - upload_to_s3(output_path, BUCKET_NAME, f"{FOLDER_NAME}/{output_path.name}") return output_path diff --git a/tasks/diarize.py b/tasks/diarize.py index 43250f8..5396b38 100644 --- a/tasks/diarize.py +++ b/tasks/diarize.py @@ -1,14 +1,42 @@ +import os +from src.aws import get_video_from_s3, upload_to_s3 from src.run_diarization import download_video, run_diarization from prefect import task from src.models.meeting import Meeting +BUCKET_NAME = os.getenv("S3_BUCKET") +FOLDER_NAME = "videos" + + +# @task +def download_video_and_put_in_s3(meeting: Meeting): + video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video)) + if video_file: + print(f"Uploading video to S3: {video_file}") + s3_path = f"{FOLDER_NAME}/{video_file.name}" + upload_to_s3(video_file, BUCKET_NAME, f"{FOLDER_NAME}/{video_file.name}") + print(f"Uploaded video to S3: {s3_path}") + print("Saving meeting.") + meeting.s3_path = s3_path + meeting.save() + else: + print("Video file not found") + + @task def diarize_meeting(meeting: Meeting): - video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video)) + if BUCKET_NAME is None: + raise ValueError("S3_BUCKET environment variable is not set") + + if meeting.s3_path is None: + print(f"Meeting {meeting.meeting} has no s3_path, skipping") + return + + video_file = get_video_from_s3(BUCKET_NAME, meeting.s3_path) if video_file: - run_diarization(video_file) + run_diarization(video_file, meeting) else: print("Video file not found") # TODO: Update meeting with transcript location diff --git a/tasks/meetings.py b/tasks/meetings.py index 161d40d..7bdfbb2 100644 --- a/tasks/meetings.py +++ b/tasks/meetings.py @@ -10,7 +10,7 @@ from src.models.meeting import Meeting -@task +# @task def register_meetings() -> List[Meeting]: # TODO: accept max_limit parameter tgov_meetings = asyncio.run(get_tgov_meetings())