Skip to content

Commit c1eaee9

Browse files
prabhath004prabhath004
andauthored
feat: add max_file_size support to AmazonS3 source (#1257)
Add optional max_file_size parameter to filter files by size in both list() and get_value() APIs. Files exceeding the limit are treated as non-existent. Closes #1252 Co-authored-by: prabhath004 <ppalakur@gmu.edu>
1 parent d204ed9 commit c1eaee9

File tree

3 files changed

+34
-0
lines changed

3 files changed

+34
-0
lines changed

docs/docs/sources/amazons3.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ The spec takes the following fields:
131131

132132
:::
133133

134+
* `max_file_size` (`int`, optional): if provided, files exceeding this size in bytes will be treated as non-existent and skipped during processing.
135+
This is useful to avoid processing large files that are not relevant to your use case, such as videos or backups.
136+
If not specified, no size limit is applied.
134137
* `sqs_queue_url` (`str`, optional): if provided, the source will receive change event notifications from Amazon S3 via this SQS queue.
135138

136139
:::info

python/cocoindex/sources/_engine_builtin_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class AmazonS3(op.SourceSpec):
5555
binary: bool = False
5656
included_patterns: list[str] | None = None
5757
excluded_patterns: list[str] | None = None
58+
max_file_size: int | None = None
5859
sqs_queue_url: str | None = None
5960
redis: RedisNotification | None = None
6061

src/ops/sources/amazon_s3.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct Spec {
3434
binary: bool,
3535
included_patterns: Option<Vec<String>>,
3636
excluded_patterns: Option<Vec<String>>,
37+
max_file_size: Option<i64>,
3738
sqs_queue_url: Option<String>,
3839
redis: Option<RedisConfig>,
3940
}
@@ -82,6 +83,7 @@ struct Executor {
8283
prefix: Option<String>,
8384
binary: bool,
8485
pattern_matcher: PatternMatcher,
86+
max_file_size: Option<i64>,
8587
sqs_context: Option<Arc<SqsContext>>,
8688
redis_context: Option<Arc<RedisContext>>,
8789
}
@@ -115,6 +117,14 @@ impl SourceExecutor for Executor {
115117
if let Some(key) = obj.key() {
116118
// Only include files (not folders)
117119
if key.ends_with('/') { continue; }
120+
// Check file size limit
121+
if let Some(max_size) = self.max_file_size {
122+
if let Some(size) = obj.size() {
123+
if size > max_size {
124+
continue;
125+
}
126+
}
127+
}
118128
if self.pattern_matcher.is_file_included(key) {
119129
batch.push(PartialSourceRow {
120130
key: KeyValue::from_single_part(key.to_string()),
@@ -156,6 +166,25 @@ impl SourceExecutor for Executor {
156166
content_version_fp: None,
157167
});
158168
}
169+
// Check file size limit
170+
if let Some(max_size) = self.max_file_size {
171+
let head_result = self
172+
.client
173+
.head_object()
174+
.bucket(&self.bucket_name)
175+
.key(key_str.as_ref())
176+
.send()
177+
.await?;
178+
if let Some(size) = head_result.content_length() {
179+
if size > max_size {
180+
return Ok(PartialSourceRowData {
181+
value: Some(SourceValue::NonExistence),
182+
ordinal: Some(Ordinal::unavailable()),
183+
content_version_fp: None,
184+
});
185+
}
186+
}
187+
}
159188
let resp = self
160189
.client
161190
.get_object()
@@ -457,6 +486,7 @@ impl SourceFactoryBase for Factory {
457486
prefix: spec.prefix,
458487
binary: spec.binary,
459488
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
489+
max_file_size: spec.max_file_size,
460490
sqs_context: spec.sqs_queue_url.map(|url| {
461491
Arc::new(SqsContext {
462492
client: aws_sdk_sqs::Client::new(&config),

0 commit comments

Comments
 (0)