Skip to content

Commit e87fa0c

Browse files
Khauneesh-AIKeivan Vosoughi
authored andcommitted
Add S3 export functionality with database support
1 parent e4d7aea commit e87fa0c

File tree

8 files changed

+282
-86
lines changed

8 files changed

+282
-86
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""add_s3_export_path
2+
3+
Revision ID: 1a8fdc23eb6f
4+
Revises: 9023b46c8d4c
5+
Create Date: 2025-04-22 20:01:13.247491
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = '1a8fdc23eb6f'
16+
down_revision: Union[str, None] = '9023b46c8d4c'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# Add s3_export_path column to generation_metadata table
23+
with op.batch_alter_table('generation_metadata', schema=None) as batch_op:
24+
batch_op.add_column(sa.Column('s3_export_path', sa.Text(), nullable=True))
25+
26+
# Add s3_export_path column to export_metadata table
27+
with op.batch_alter_table('export_metadata', schema=None) as batch_op:
28+
batch_op.add_column(sa.Column('s3_export_path', sa.Text(), nullable=True))
29+
30+
31+
def downgrade() -> None:
32+
# Remove s3_export_path column from generation_metadata table
33+
with op.batch_alter_table('generation_metadata', schema=None) as batch_op:
34+
batch_op.drop_column('s3_export_path')
35+
36+
# Remove s3_export_path column from export_metadata table
37+
with op.batch_alter_table('export_metadata', schema=None) as batch_op:
38+
batch_op.drop_column('s3_export_path')

app/core/database.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def init_db(self):
6060
display_name TEXT,
6161
local_export_path TEXT,
6262
hf_export_path TEXT,
63+
s3_export_path TEXT,
6364
num_questions FLOAT,
6465
total_count FLOAT,
6566
topics TEXT,
@@ -107,6 +108,7 @@ def init_db(self):
107108
display_name TEXT,
108109
local_export_path TEXT,
109110
hf_export_path TEXT,
111+
s3_export_path TEXT,
110112
job_id TEXT,
111113
job_name TEXT UNIQUE,
112114
job_status TEXT,
@@ -145,29 +147,24 @@ def save_generation_metadata(self, metadata: Dict) -> int:
145147
try:
146148
# Prepare data outside transaction
147149
if metadata.get('generate_file_name'):
148-
149150
output_paths = metadata.get('output_path', {})
150151
else:
151-
152152
output_paths = {}
153-
154-
155153

156154
# Use a single connection with enhanced settings
157155
with self.get_connection() as conn:
158156
conn.execute("BEGIN IMMEDIATE")
159157

160-
161158
cursor = conn.cursor()
162159

163160
query = """
164161
INSERT INTO generation_metadata (
165-
timestamp, technique, model_id, inference_type, caii_endpoint, use_case,
166-
custom_prompt, model_parameters, input_key, output_key, output_value, generate_file_name,
167-
display_name, local_export_path, hf_export_path,
168-
num_questions, total_count, topics, examples,
169-
schema, doc_paths, input_path, job_id, job_name, job_status, job_creator_name
170-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
162+
timestamp, technique, model_id, inference_type, caii_endpoint, use_case,
163+
custom_prompt, model_parameters, input_key, output_key, output_value, generate_file_name,
164+
display_name, local_export_path, hf_export_path, s3_export_path,
165+
num_questions, total_count, topics, examples,
166+
schema, doc_paths, input_path, job_id, job_name, job_status, job_creator_name
167+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
171168
"""
172169

173170
values = (
@@ -186,6 +183,7 @@ def save_generation_metadata(self, metadata: Dict) -> int:
186183
metadata.get('display_name', None),
187184
output_paths.get('local', None),
188185
output_paths.get('huggingface', None),
186+
output_paths.get('s3', None),
189187
metadata.get('num_questions', None),
190188
metadata.get('total_count', None),
191189
metadata.get('topics', None),
@@ -198,20 +196,18 @@ def save_generation_metadata(self, metadata: Dict) -> int:
198196
metadata.get('job_status', None),
199197
metadata.get('job_creator_name', None)
200198
)
201-
#print(values)
202199

203200
cursor.execute(query, values)
204201
conn.commit()
205202
return cursor.lastrowid
206203

207204
except sqlite3.OperationalError as e:
208-
if conn:
205+
if 'conn' in locals():
209206
conn.rollback()
210207
print(f"Database operation error in save_generation_metadata: {e}")
211-
212208
raise
213209
except Exception as e:
214-
if conn:
210+
if 'conn' in locals():
215211
conn.rollback()
216212
print(f"Error saving metadata to database: {str(e)}")
217213
raise
@@ -359,7 +355,6 @@ def save_evaluation_metadata(self, metadata: Dict) -> int:
359355
def save_export_metadata(self, metadata: Dict) -> int:
360356
"""Save export metadata to database with prepared transaction"""
361357
try:
362-
363358
# Use a single connection with enhanced settings
364359
with self.get_connection() as conn:
365360
conn.execute("BEGIN IMMEDIATE")
@@ -373,11 +368,12 @@ def save_export_metadata(self, metadata: Dict) -> int:
373368
display_name,
374369
local_export_path,
375370
hf_export_path,
371+
s3_export_path,
376372
job_id,
377373
job_name,
378374
job_status,
379375
job_creator_name
380-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
376+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
381377
"""
382378

383379
values = (
@@ -386,6 +382,7 @@ def save_export_metadata(self, metadata: Dict) -> int:
386382
metadata.get('display_name'),
387383
metadata.get('local_export_path', None),
388384
metadata.get('hf_export_path', None),
385+
metadata.get('s3_export_path', None), # Add this line
389386
metadata.get('job_id', None),
390387
metadata.get('job_name', None),
391388
metadata.get('job_status', None),
@@ -1131,4 +1128,21 @@ def backup_and_restore_db(self, force_restore: bool = False) -> bool:
11311128
print(f"Force restore failed: {str(restore_error)}")
11321129
return False
11331130

1131+
def update_s3_path(self, file_name: str, s3_path: str):
1132+
"""Update s3_export_path for a generation"""
1133+
try:
1134+
with self.get_connection() as conn:
1135+
conn.execute("BEGIN IMMEDIATE")
1136+
cursor = conn.cursor()
1137+
1138+
# Update the s3_path
1139+
cursor.execute(
1140+
"UPDATE generation_metadata SET s3_export_path = ? WHERE generate_file_name = ?",
1141+
(s3_path, file_name)
1142+
)
1143+
conn.commit()
1144+
print(f"S3 path update successful for file: {file_name}")
1145+
except Exception as e:
1146+
print(f"Error updating S3 export path: {str(e)}")
1147+
raise
11341148

app/migrations/alembic_schema_models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class GenerationMetadataModel(Base):
2323
display_name = Column(Text)
2424
local_export_path = Column(Text)
2525
hf_export_path = Column(Text)
26+
s3_export_path = Column(Text)
2627
num_questions = Column(Float)
2728
total_count = Column(Float)
2829
topics = Column(Text)
@@ -66,6 +67,7 @@ class ExportMetadataModel(Base):
6667
display_name = Column(Text)
6768
local_export_path = Column(Text)
6869
hf_export_path = Column(Text)
70+
s3_export_path = Column(Text)
6971
job_id = Column(Text)
7072
job_name = Column(Text, unique=True)
7173
job_status = Column(Text)

app/models/request_models.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ class Example_eval(BaseModel):
4545
)
4646

4747

48+
# In app/models/request_models.py
4849
class S3Config(BaseModel):
4950
"""S3 export configuration"""
5051
bucket: str
51-
key: str
52+
key: str = "" # Make key optional with default empty string
53+
create_if_not_exists: bool = True # Flag to create bucket if it doesn't exist
5254

5355
class HFConfig(BaseModel):
5456
"""HF export configuration"""
@@ -59,41 +61,40 @@ class HFConfig(BaseModel):
5961
hf_commit_message: Optional[str] = "Hugging face export" # Commit message
6062

6163
class Export_synth(BaseModel):
62-
# Export configuration
63-
export_type: List[str] = Field(default_factory=lambda: ["huggingface"]) # Accept multiple export types (e.g., ["s3", "huggingface"])
64-
file_path:str
65-
display_name:Optional[str]= None
64+
# Existing fields...
65+
export_type: List[str] = Field(default_factory=lambda: ["huggingface"])
66+
file_path: str
67+
display_name: Optional[str] = None
6668
output_key: Optional[str] = 'Prompt'
6769
output_value: Optional[str] = 'Completion'
6870

69-
7071
# Hugging Face-specific fields
71-
hf_config:HFConfig
72+
hf_config: Optional[HFConfig] = None # Make HF config optional
7273

7374
# Optional s3 config
7475
s3_config: Optional[S3Config] = None
7576

76-
model_config = ConfigDict(protected_namespaces=(),
77+
model_config = ConfigDict(
78+
protected_namespaces=(),
7779
json_schema_extra={
7880
"example": {
79-
"export_type": [
80-
"huggingface"
81-
],
82-
"file_path": "qa_pairs_claude_20241204_132411_test.json",
83-
"hf_config": {
84-
"hf_token": "your token",
85-
"hf_username": "your_username",
86-
"hf_repo_name": "file_name",
87-
"hf_commit_message": "dataset trial"
88-
}
89-
90-
81+
"export_type": ["huggingface", "s3"],
82+
"file_path": "qa_pairs_claude_20241204_132411_test.json",
83+
"hf_config": {
84+
"hf_token": "your token",
85+
"hf_username": "your_username",
86+
"hf_repo_name": "file_name",
87+
"hf_commit_message": "dataset trial"
88+
},
89+
"s3_config": {
90+
"bucket": "my-dataset-bucket",
91+
"create_if_not_exists": True
92+
}
9193
}
9294
}
9395
)
9496

9597

96-
9798
class ModelParameters(BaseModel):
9899
"""Low-level model parameters"""
99100
temperature: float = Field(default=0.0, ge=0.0, le=2.0, description="Controls randomness (0.0 to 1.0)")

app/run_export_job.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,6 @@
88

99
os.chdir("/home/cdsw/synthetic-data-studio")
1010

11-
# def check_and_install_requirements():
12-
# """Check and install requirements from requirements.txt"""
13-
# # Get the current working directory instead of using __file__
14-
# current_dir = os.getcwd()
15-
# requirements_path = os.path.join(current_dir, 'requirements.txt')
16-
17-
# if os.path.exists(requirements_path):
18-
# try:
19-
# print(f"Installing requirements from: {requirements_path}")
20-
# subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', requirements_path])
21-
# except subprocess.CalledProcessError as e:
22-
# print(f"Error installing requirements: {e}")
23-
# sys.exit(1)
24-
# else:
25-
# print("No requirements.txt found, continuing with existing packages")
26-
27-
# # Run installation check at start
28-
# check_and_install_requirements()
2911

3012
# Get the current notebook's directory
3113
notebook_dir = os.getcwd()

app/services/export_results.py

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from app.models.request_models import Export_synth
1616

1717
from app.core.database import DatabaseManager
18+
from app.services.s3_export import export_to_s3
1819

1920
import logging
2021
from logging.handlers import RotatingFileHandler
@@ -101,38 +102,69 @@ def _create_dataset(self, records:List, output_key, output_value, file_path) ->
101102
return dataset
102103

103104

104-
def export(self,request:Export_synth):
105+
def export(self, request: Export_synth):
105106
try:
106107
export_paths = {}
107108
file_name = os.path.basename(request.file_path)
108-
try:
109-
with open(request.file_path, 'r') as f:
110-
output_data = json.load(f)
111-
except FileNotFoundError:
112-
raise HTTPException(status_code=404, detail=f"File not found: {request.file_path}")
113-
except json.JSONDecodeError as e:
114-
raise HTTPException(status_code=400, detail=f"Invalid JSON file: {str(e)}")
115109

116110
for export_type in request.export_type:
117-
if export_type == "s3" and request.s3_config:
118-
s3_client = boto3.client("s3")
119-
s3_client.put_object(
120-
Bucket=request.s3_config.bucket,
121-
Key=request.s3_config.key,
122-
Body=json.dumps(output_data, indent=2),
123-
)
124-
export_paths['s3']= f"s3://{request.s3_config.bucket}/{request.s3_config.key}"
125-
self.logger.info(f"Results saved to S3: {export_paths['s3']}")
126-
111+
# S3 Export
112+
if export_type == "s3":
113+
if not request.s3_config:
114+
raise HTTPException(status_code=400, detail="S3 configuration required for S3 export")
115+
116+
try:
117+
# Get bucket and key from request
118+
bucket_name = request.s3_config.bucket
119+
key = request.s3_config.key or file_name
120+
121+
# Override with display_name if provided
122+
if request.display_name and not request.s3_config.key:
123+
key = f"{request.display_name}.json"
124+
125+
126+
127+
128+
create_bucket = getattr(request.s3_config, 'create_if_not_exists', True)
129+
130+
s3_result = export_to_s3(
131+
file_path=request.file_path,
132+
bucket_name=bucket_name,
133+
key=key,
134+
create_bucket=create_bucket
135+
)
136+
137+
s3_path = s3_result['s3']
138+
self.logger.info(f"Results saved to S3: {s3_path}")
139+
140+
# Update database with S3 path
141+
self.db.update_s3_path(file_name, s3_path)
142+
self.logger.info(f"Generation Metadata updated for s3_path: {s3_path}")
143+
144+
export_paths['s3'] = s3_path
145+
146+
except Exception as e:
147+
self.logger.error(f"Error exporting to S3: {str(e)}", exc_info=True)
148+
raise APIError(f"S3 export failed: {str(e)}")
149+
150+
# HuggingFace Export (existing code)
127151
elif export_type == "huggingface" and request.hf_config:
152+
# We still need to read the file for HuggingFace export
153+
try:
154+
with open(request.file_path, 'r') as f:
155+
output_data = json.load(f)
156+
except FileNotFoundError:
157+
raise HTTPException(status_code=404, detail=f"File not found: {request.file_path}")
158+
except json.JSONDecodeError as e:
159+
raise HTTPException(status_code=400, detail=f"Invalid JSON file: {str(e)}")
160+
128161
self.logger.info(f"Creating HuggingFace dataset: {request.hf_config.hf_repo_name}")
129162

130163
# Set up HuggingFace authentication
131164
HfFolder.save_token(request.hf_config.hf_token)
132165

133166
# Convert JSON to dataset
134167
dataset = self._create_dataset(output_data, request.output_key, request.output_value, request.file_path)
135-
print(dataset)
136168

137169
# Push to HuggingFace Hub as a dataset
138170
repo_id = f"{request.hf_config.hf_username}/{request.hf_config.hf_repo_name}"
@@ -146,8 +178,8 @@ def export(self,request:Export_synth):
146178
self.logger.info(f"Dataset published to HuggingFace: {export_paths['huggingface']}")
147179
self.db.update_hf_path(file_name, export_paths['huggingface'])
148180
self.logger.info(f"Generation Metadata updated for hf_path: {export_paths['huggingface']}")
149-
150-
return export_paths
181+
182+
return export_paths
151183

152184
except Exception as e:
153185
self.logger.error(f"Error saving results: {str(e)}", exc_info=True)

0 commit comments

Comments
 (0)