Skip to content

Commit cc3ca22

Browse files
authored
to 3.0: supporting data branch diff as file with stage. (#23189)
### **User description** ## What type of PR is this? - [ ] API-change - [ ] BUG - [x] Improvement - [ ] Documentation - [ ] Feature - [ ] Test and CI - [ ] Code Refactoring ## Which issue(s) this PR fixes: issue ##21979 ## What this PR does / why we need it: ``` create stage stage02 url = 's3://.../' credentials = {"aws_key_id"='...',"aws_secret_key"='...',"AWS_REGION"='...','PROVIDER'='...', 'ENDPOINT'='...'}; data branch diff t2 against t1 output 'stage://stage02'; ``` ___ ### **PR Type** Enhancement ___ ### **Description** - Add stage path decoding support for data branch diff output - Refactor file path handling to support both local and stage paths - Implement pipe-based file writing for improved performance - Add context cancellation checks in diff operations ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Data Branch Diff"] -->|"Output to stage or local"| B["tryDecodeStagePath"] B -->|"Stage path detected"| C["prepareFSForDiffAsFile"] B -->|"Local path"| D["Standard file handling"] C -->|"Mutable FS"| E["Direct mutator write"] C -->|"Immutable FS"| F["newSingleWriteAppender"] F -->|"Pipe-based writing"| G["File output"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Refactoring</strong></td><td><table> <tr> <td> <details> <summary><strong>authenticate.go</strong><dd><code>Extract stage path decoding into helper function</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/frontend/authenticate.go <ul><li>Extract stage path decoding logic into new <code>tryDecodeStagePath</code> function<br> <li> Refactor <code>doCheckFilePath</code> to use the new helper function<br> <li> Improve error handling and return value clarity</ul> </details> </td> <td><a href="https://github.com/matrixorigin/matrixone/pull/23189/files#diff-849f201c351210bd95807e99d1538e2602a5244b256c35e58467afe304c509e6">+38/-15</a>&nbsp; </td> </tr> </table></td></tr><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>data_branch.go</strong><dd><code>Add stage path support and pipe-based file writing</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> pkg/frontend/data_branch.go <ul><li>Add <code>io</code> package import for pipe-based file writing<br> <li> Disable validation function to allow stage path processing<br> <li> Increase channel buffer size from 10 to 100 for better throughput<br> <li> Fix context parameter in <code>diffOnBase</code> call from <code>execCtx.reqCtx</code> to <code>ctx</code><br> <li> Refactor <code>prepareFSForDiffAsFile</code> to support stage paths with proper <br>cleanup handling<br> <li> Implement <code>newSingleWriteAppender</code> function using <code>io.Pipe</code> for efficient <br>streaming writes<br> <li> Update <code>writeCSV</code> to handle both stage and local file paths with <br>appropriate cleanup<br> <li> Add context cancellation checks in <code>diffDataHelper</code> to prevent resource <br>leaks</ul> </details> </td> <td><a href="https://github.com/matrixorigin/matrixone/pull/23189/files#diff-632e0d2d46c8bdef4144b4d6f532cf8df78cc583e4903672ae6675d8c09783b4">+154/-49</a></td> </tr> </table></td></tr></tr></tbody></table> </details> ___
1 parent b716774 commit cc3ca22

File tree

3 files changed

+411
-127
lines changed

3 files changed

+411
-127
lines changed

pkg/frontend/authenticate.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3516,30 +3516,53 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
35163516
return err
35173517
}
35183518

3519-
func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (err error) {
3520-
//var err error
3521-
var filePath string
3522-
if ep == nil {
3523-
return err
3524-
}
3519+
func tryDecodeStagePath(
3520+
ses *Session,
3521+
filePath string,
3522+
) (retPath string, ok bool, err error) {
3523+
3524+
var (
3525+
s stage.StageDef
3526+
)
35253527

3526-
// detect filepath contain stage or not
3527-
filePath = ep.FilePath
35283528
if strings.HasPrefix(filePath, stage.STAGE_PROTOCOL+"://") {
35293529
// stage:// URL
3530-
s, err := stageutil.UrlToStageDef(filePath, ses.proc)
3531-
if err != nil {
3532-
return err
3530+
if s, err = stageutil.UrlToStageDef(filePath, ses.proc); err != nil {
3531+
return
35333532
}
35343533

35353534
// s.ToPath() returns the fileservice filepath, i.e. s3,...:/path for S3 or /path for local file
3536-
ses.ep.userConfig.StageFilePath, _, err = s.ToPath()
3537-
if err != nil {
3538-
return err
3535+
if retPath, _, err = s.ToPath(); err != nil {
3536+
return
35393537
}
3538+
3539+
ok = true
35403540
}
3541-
return err
35423541

3542+
return
3543+
}
3544+
3545+
func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (err error) {
3546+
if ep == nil {
3547+
return err
3548+
}
3549+
3550+
// detect filepath contain stage or not
3551+
//var err error
3552+
var (
3553+
ok bool
3554+
filePath string
3555+
)
3556+
3557+
if filePath, ok, err = tryDecodeStagePath(ses, ep.FilePath); err != nil {
3558+
return
3559+
}
3560+
3561+
if ok {
3562+
ses.ep.userConfig.StageFilePath = filePath
3563+
}
3564+
3565+
return
35433566
}
35443567

35453568
func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err error) {

0 commit comments

Comments
 (0)