@@ -39,9 +39,10 @@ const (
3939
4040// Client is a wrapper around AWS S3 client
4141type Client struct {
42- client * s3.Client
43- bucketName string
44- logger * slog.Logger
42+ client * s3.Client
43+ bucketName string
44+ bucketNamespace string
45+ logger * slog.Logger
4546}
4647
4748// ProgressReader is a wrapper around an io.Reader that reports progress
@@ -145,7 +146,7 @@ func (l *filteringLogger) Logf(classification logging.Classification, format str
145146}
146147
147148// NewClient creates a new S3 client
148- func NewClient (endpoint , bucketName , accessKey , secretKey string ) (* Client , error ) {
149+ func NewClient (endpoint , bucketName , bucketNamespace , accessKey , secretKey string ) (* Client , error ) {
149150 // Create exactly the same config as the faster implementation
150151 cfg := aws.Config {
151152 Credentials : credentials .NewStaticCredentialsProvider (accessKey , secretKey , "" ),
@@ -162,9 +163,10 @@ func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, erro
162163 client := s3 .NewFromConfig (cfg )
163164
164165 return & Client {
165- client : client ,
166- bucketName : bucketName ,
167- logger : slog .New (slog .NewTextHandler (os .Stdout , nil )),
166+ client : client ,
167+ bucketName : bucketName ,
168+ bucketNamespace : bucketNamespace ,
169+ logger : slog .New (slog .NewTextHandler (os .Stdout , nil )),
168170 }, nil
169171}
170172
@@ -175,13 +177,20 @@ func (c *Client) SetLogger(logger *slog.Logger) {
175177 }
176178}
177179
180+ // getRemotePath returns the path with namespace prefix if namespace is set
181+ func (c * Client ) getRemotePath (path string ) string {
182+ if c .bucketNamespace == "" {
183+ return path
184+ }
185+ return c .bucketNamespace + "/" + path
186+ }
187+
178188// PartReader is a reader that handles part uploads with proper checksum handling
179189type PartReader struct {
180190 reader io.Reader
181191 size int64
182192 read int64
183193 progressFn func (n int64 )
184- buf []byte
185194}
186195
187196func NewPartReader (reader io.Reader , size int64 , progressFn func (n int64 )) * PartReader {
@@ -207,8 +216,10 @@ func (pr *PartReader) Size() int64 {
207216 return pr .size
208217}
209218
210- // CleanupMultipartUploads cleans up any existing multipart uploads for the given key
211- func (c * Client ) CleanupMultipartUploads (ctx context.Context , key string ) error {
219+ // CleanupMultipartUploads cleans up any existing multipart uploads for the given path
220+ func (c * Client ) CleanupMultipartUploads (ctx context.Context , path string ) error {
221+ // Apply namespace to S3 path
222+ key := c .getRemotePath (path )
212223 c .logger .Info ("Starting cleanup of existing multipart uploads" , "key" , key )
213224
214225 // List all multipart uploads
@@ -260,7 +271,7 @@ func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error
260271}
261272
262273// UploadFile uploads a file to S3 using the fast implementation approach
263- func (c * Client ) UploadFile (ctx context.Context , localPath , s3Path string ) error {
274+ func (c * Client ) UploadFile (ctx context.Context , localPath , path string ) error {
264275 // Open file exactly as in the faster implementation
265276 file , err := os .Open (localPath )
266277 if err != nil {
@@ -275,6 +286,9 @@ func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error
275286 }
276287 fileSize := info .Size ()
277288
289+ // Apply namespace to S3 path
290+ s3Path := c .getRemotePath (path )
291+
278292 c .logger .Info ("Starting upload" , "file" , localPath , "key" , s3Path , "size" , fileSize )
279293
280294 // Create multipart upload exactly as in the faster implementation
@@ -388,8 +402,10 @@ func (c *Client) UploadFileMultipart(ctx context.Context, localPath, s3Path stri
388402 return c .UploadFile (ctx , localPath , s3Path )
389403}
390404
391- // cleanupExistingUploads aborts any existing multipart uploads for the given key
392- func (c * Client ) cleanupExistingUploads (ctx context.Context , key string ) error {
405+ // cleanupExistingUploads aborts any existing multipart uploads for the given path
406+ func (c * Client ) cleanupExistingUploads (ctx context.Context , path string ) error {
407+ // Apply namespace to S3 path
408+ key := c .getRemotePath (path )
393409 listResp , err := c .client .ListMultipartUploads (ctx , & s3.ListMultipartUploadsInput {
394410 Bucket : aws .String (c .bucketName ),
395411 Prefix : aws .String (key ),
@@ -478,11 +494,13 @@ func formatFileSize(size int64) string {
478494}
479495
480496// FileExists checks if a file exists in S3
481- func (c * Client ) FileExists (s3Path string ) (bool , error ) {
497+ func (c * Client ) FileExists (path string ) (bool , error ) {
482498 // Create a context with timeout for the operation
483499 ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
484500 defer cancel ()
485501
502+ // Apply namespace to S3 path
503+ s3Path := c .getRemotePath (path )
486504 startTime := time .Now ()
487505 _ , err := c .client .HeadObject (ctx , & s3.HeadObjectInput {
488506 Bucket : aws .String (c .bucketName ),
@@ -513,7 +531,7 @@ func (c *Client) FileExists(s3Path string) (bool, error) {
513531}
514532
515533// DownloadFile downloads a file from S3
516- func (c * Client ) DownloadFile (s3Path , localPath string ) error {
534+ func (c * Client ) DownloadFile (path , localPath string ) error {
517535 // Create the directory if it doesn't exist
518536 dir := filepath .Dir (localPath )
519537 if err := os .MkdirAll (dir , 0755 ); err != nil {
@@ -527,6 +545,8 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
527545 }
528546 defer file .Close ()
529547
548+ // Apply namespace to S3 path
549+ s3Path := c .getRemotePath (path )
530550 c .logger .Info ("Downloading file" , "s3_path" , s3Path , "local_path" , localPath )
531551
532552 // Download the file
@@ -550,7 +570,9 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
550570}
551571
552572// DeleteObject deletes an object from S3, ensuring metadata files for snapshots are preserved
553- func (c * Client ) DeleteObject (s3Path string ) error {
573+ func (c * Client ) DeleteObject (path string ) error {
574+ // Apply namespace to S3 path
575+ s3Path := c .getRemotePath (path )
554576 // If we're deleting a metadata file, first check if the snapshot exists
555577 if IsMetadataKey (s3Path ) {
556578 hasSnapshot , err := c .HasCorrespondingFile (context .Background (), s3Path )
@@ -603,7 +625,8 @@ func (c *Client) DeleteObjects(s3Paths []string) error {
603625 continue
604626 }
605627 }
606- filteredPaths = append (filteredPaths , path )
628+ // Apply namespace to S3 path
629+ filteredPaths = append (filteredPaths , c .getRemotePath (path ))
607630 }
608631 c .logger .Info ("Filtered paths for deletion" ,
609632 "original_count" , len (s3Paths ),
@@ -693,7 +716,7 @@ func (c *Client) ListObjects(prefix string) ([]S3Object, error) {
693716 for {
694717 result , err := c .client .ListObjectsV2 (context .Background (), & s3.ListObjectsV2Input {
695718 Bucket : aws .String (c .bucketName ),
696- Prefix : aws .String (prefix ),
719+ Prefix : aws .String (c . getRemotePath ( prefix ) ),
697720 ContinuationToken : continuationToken ,
698721 })
699722 if err != nil {
@@ -786,7 +809,7 @@ func GetUploadInfoFilePath(filePath string) string {
786809func (c * Client ) AbortMultipartUpload (s3Path , uploadID string ) error {
787810 _ , err := c .client .AbortMultipartUpload (context .Background (), & s3.AbortMultipartUploadInput {
788811 Bucket : aws .String (c .bucketName ),
789- Key : aws .String (s3Path ),
812+ Key : aws .String (c . getRemotePath ( s3Path ) ),
790813 UploadId : aws .String (uploadID ),
791814 })
792815 if err != nil {
@@ -863,10 +886,10 @@ func (c *Client) HasCorrespondingFile(ctx context.Context, key string) (bool, er
863886 var checkKey string
864887 if IsMetadataKey (key ) {
865888 // For metadata file, check if snapshot exists
866- checkKey = GetSnapshotKeyFromMetadata (key )
889+ checkKey = c . getRemotePath ( GetSnapshotKeyFromMetadata (key ) )
867890 } else if IsSnapshotKey (key ) {
868891 // For snapshot file, check if metadata exists
869- checkKey = GetMetadataKey (key )
892+ checkKey = c . getRemotePath ( GetMetadataKey (key ) )
870893 } else {
871894 // Not a snapshot or metadata file
872895 return false , nil
@@ -914,7 +937,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
914937 return fmt .Errorf ("not a metadata file: %s" , metadataKey )
915938 }
916939
917- c .logger .Info ("Updating latest.json with metadata" , "source" , metadataKey )
940+ c .logger .Info ("Updating latest.json with metadata" , "source" , c . getRemotePath ( metadataKey ) )
918941
919942 // Create a temporary file to download the metadata
920943 tmpFile , err := os .CreateTemp ("" , "metadata-*.json" )
@@ -1157,7 +1180,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
11571180 // Upload consolidated metadata as latest.json with proper content type
11581181 _ , err = c .client .PutObject (ctx , & s3.PutObjectInput {
11591182 Bucket : aws .String (c .bucketName ),
1160- Key : aws .String ("latest.json" ),
1183+ Key : aws .String (c . getRemotePath ( "latest.json" ) ),
11611184 Body : consolidatedFile ,
11621185 ContentType : aws .String ("application/json" ),
11631186 })
0 commit comments