Skip to content

Conversation

@gouhongshen
Copy link
Contributor

@gouhongshen gouhongshen commented Dec 1, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##21979

What this PR does / why we need it:

create stage stage02 url = 's3://.../' credentials = {"aws_key_id"='...',"aws_secret_key"='...',"AWS_REGION"='...','PROVIDER'='...', 'ENDPOINT'='...'};

data branch diff t2 against t1 output 'stage://stage02';


PR Type

Enhancement


Description

  • Add stage path decoding support for data branch diff output

  • Refactor file path handling to support both local and stage paths

  • Implement pipe-based file writing for improved performance

  • Add context cancellation checks in diff operations


Diagram Walkthrough

flowchart LR
  A["Data Branch Diff"] -->|"Output to stage or local"| B["tryDecodeStagePath"]
  B -->|"Stage path detected"| C["prepareFSForDiffAsFile"]
  B -->|"Local path"| D["Standard file handling"]
  C -->|"Mutable FS"| E["Direct mutator write"]
  C -->|"Immutable FS"| F["newSingleWriteAppender"]
  F -->|"Pipe-based writing"| G["File output"]
Loading

File Walkthrough

Relevant files
Refactoring
authenticate.go
Extract stage path decoding into helper function                 

pkg/frontend/authenticate.go

  • Extract stage path decoding logic into new tryDecodeStagePath function
  • Refactor doCheckFilePath to use the new helper function
  • Improve error handling and return value clarity
+38/-15 
Enhancement
data_branch.go
Add stage path support and pipe-based file writing             

pkg/frontend/data_branch.go

  • Add io package import for pipe-based file writing
  • Disable validation function to allow stage path processing
  • Increase channel buffer size from 10 to 100 for better throughput
  • Fix context parameter in diffOnBase call from execCtx.reqCtx to ctx
  • Refactor prepareFSForDiffAsFile to support stage paths with proper
    cleanup handling
  • Implement newSingleWriteAppender function using io.Pipe for efficient
    streaming writes
  • Update writeCSV to handle both stage and local file paths with
    appropriate cleanup
  • Add context cancellation checks in diffDataHelper to prevent resource
    leaks
+154/-49

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 1, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Disabled validation

Description: The validate function for DataBranchDiff now unconditionally returns nil, disabling
directory/path validation and potentially allowing unintended file paths or stage targets
to be accepted without checks.
data_branch.go [319-326]

Referred Code
func validate(stmt tree.Statement) error {
	return nil
	switch s := stmt.(type) {
	case *tree.DataBranchDiff:
		if s.OutputOpt != nil && len(s.OutputOpt.DirPath) != 0 {
			info, err := os.Stat(s.OutputOpt.DirPath)
			if err != nil {
				if os.IsNotExist(err) {
Unbounded streaming write

Description: The pipe-based writer streams data to fileservice without explicit size limits or
sanitization, which could enable large unbounded writes to external storage if upstream
controls are weak.
data_branch.go [1128-1165]

Referred Code
if err = worker.Submit(func() {
	defer close(done)
	vec := fileservice.IOVector{
		FilePath: targetPath,
		Entries: []fileservice.IOEntry{
			{
				ReaderForWrite: pr,
				Size:           -1,
			},
		},
	}
	if wErr := targetFS.Write(ctx, vec); wErr != nil {
		_ = pr.CloseWithError(wErr)
		done <- wErr
		return
	}
	done <- pr.Close()
}); err != nil {
	_ = pr.Close()
	_ = pw.Close()
	return


 ... (clipped 17 lines)
Ticket Compliance
🟢
🎫 #21979
🟢 Support a new data branching syntax to output diff results to a destination, including
stage paths (stage://).
Allow data branch diff to write output files to an external stage (e.g., S3) and local
filesystem.
Ensure implementation integrates with existing stage definition/url decoding.
Provide suitable performance for large diffs (streaming/pipe-based writing acceptable).
Add necessary documentation or user-facing hints for output path/result.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Disabled validation: The validate function was changed to unconditionally return nil, disabling directory/path
validations and potentially allowing invalid or insecure output paths without checks.

Referred Code
func validate(stmt tree.Statement) error {
	return nil
	switch s := stmt.(type) {
	case *tree.DataBranchDiff:
		if s.OutputOpt != nil && len(s.OutputOpt.DirPath) != 0 {
			info, err := os.Stat(s.OutputOpt.DirPath)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Path validation off: By returning nil immediately, input validation for output directories is bypassed and
stage/local path handling proceeds without verifying existence/permissions or sanitizing
user-provided paths.

Referred Code
func validate(stmt tree.Statement) error {
	return nil
	switch s := stmt.(type) {
	case *tree.DataBranchDiff:
		if s.OutputOpt != nil && len(s.OutputOpt.DirPath) != 0 {
			info, err := os.Stat(s.OutputOpt.DirPath)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing audit logs: Newly added file output and deletion operations (including stage path handling and
streaming writes) perform critical I/O without any added auditing/logging of who performed
the action, when, and the outcome.

Referred Code
func prepareFSForDiffAsFile(
	ctx context.Context,
	ses *Session,
	stmt *tree.DataBranchDiff,
	tblStuff tableStuff,
) (
	sqlRetPath, sqlRetHint string,
	writeFile func([]byte) error,
	release func(),
	cleanup func(),
	err error,
) {
	var (
		ok           bool
		stagePath    string
		fileName     string
		fullFilePath string
	)

	fileName = makeFileName(stmt.BaseTable.AtTsExpr, stmt.TargetTable.AtTsExpr, tblStuff)
	fileName += ".sql"


 ... (clipped 158 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Stage path secrecy: New stage path decoding and file path resolution handle potentially sensitive
endpoint/credentials indirectly, but there is no evidence of redaction or assurance logs
won’t include sensitive paths if logged elsewhere.

Referred Code
func tryDecodeStagePath(
	ses *Session,
	filePath string,
) (retPath string, ok bool, err error) {

	var (
		s stage.StageDef
	)

	if strings.HasPrefix(filePath, stage.STAGE_PROTOCOL+"://") {
		// stage:// URL
		if s, err = stageutil.UrlToStageDef(filePath, ses.proc); err != nil {
			return
		}

		// s.ToPath() returns the fileservice filepath, i.e. s3,...:/path for S3 or /path for local file
		if retPath, _, err = s.ToPath(); err != nil {
			return
		}

		ok = true


 ... (clipped 27 lines)

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 1, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix potential deadlock in worker goroutine

Fix a potential deadlock in the newSingleWriteAppender goroutine. If
targetFS.Write fails, add a return after sending the error to prevent the
goroutine from blocking on a subsequent channel send.

pkg/frontend/data_branch.go [1128-1149]

 	if err = worker.Submit(func() {
 		defer close(done)
 		vec := fileservice.IOVector{
 			FilePath: targetPath,
 			Entries: []fileservice.IOEntry{
 				{
 					ReaderForWrite: pr,
 					Size:           -1,
 				},
 			},
 		}
 		if wErr := targetFS.Write(ctx, vec); wErr != nil {
+			// The reader is closed by the fileservice on write error,
+			// but we close it with the error to unblock the writer.
 			_ = pr.CloseWithError(wErr)
 			done <- wErr
 			return
 		}
 		done <- pr.Close()
 	}); err != nil {
 		_ = pr.Close()
 		_ = pw.Close()
 		return
 	}
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential deadlock in the worker goroutine if a write error occurs, and provides a correct fix by adding a return statement.

Medium
Restore validation for non-stage paths
Suggestion Impact:The commit reintroduced validation logic for the diff output directory, adding a new validate function that selectively validates local and fileservice paths while handling stage paths appropriately. It replaced the previous no-op validate and expanded checks beyond simple os.Stat, aligning with the suggestion’s intent.

code diff:

-	if err = validate(stmt); err != nil {
+	if err = validate(execCtx.reqCtx, ses, stmt); err != nil {
 		return err
 	}
 
@@ -985,6 +964,101 @@
 		srcName, baseName,
 		time.Now().UTC().Format("20060102_150405"),
 	)
+}
+
+func validate(
+	ctx context.Context,
+	ses *Session,
+	stmt tree.Statement,
+) error {
+	if stmt == nil {
+		return nil
+	}
+
+	var (
+		ok       bool
+		diffStmt *tree.DataBranchDiff
+	)
+
+	if diffStmt, ok = stmt.(*tree.DataBranchDiff); !ok {
+		return nil
+	}
+
+	if diffStmt.OutputOpt != nil && len(diffStmt.OutputOpt.DirPath) > 0 {
+		if err := validateOutputDirPath(ctx, ses, diffStmt.OutputOpt.DirPath); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func validateOutputDirPath(ctx context.Context, ses *Session, dirPath string) (err error) {
+	if len(dirPath) == 0 {
+		return nil
+	}
+
+	var (
+		stagePath    string
+		ok           bool
+		inputDirPath = dirPath
+	)
+
+	if stagePath, ok, err = tryDecodeStagePath(ses, dirPath); err != nil {
+		return
+	} else if ok {
+		dirPath = stagePath
+	}
+
+	var fsPath fileservice.Path
+	if fsPath, err = fileservice.ParsePath(dirPath); err != nil {
+		return
+	}
+
+	if fsPath.Service == "" {
+		var info os.FileInfo
+		if info, err = os.Stat(dirPath); err != nil {
+			if os.IsNotExist(err) {
+				return moerr.NewInvalidInputNoCtxf("output directory %s does not exist", inputDirPath)
+			}
+			return
+		}
+		if !info.IsDir() {
+			return moerr.NewInvalidInputNoCtxf("output directory %s is not a directory", inputDirPath)
+		}
+		return nil
+	}
+
+	var (
+		targetFS   fileservice.FileService
+		targetPath string
+		entry      *fileservice.DirEntry
+	)
+
+	if fsPath.Service == defines.SharedFileServiceName {
+		targetFS = getPu(ses.GetService()).FileService
+		targetPath = dirPath
+	} else {
+		var etlFS fileservice.ETLFileService
+		if etlFS, targetPath, err = fileservice.GetForETL(ctx, nil, dirPath); err != nil {
+			return
+		}
+		targetFS = etlFS
+		defer targetFS.Close(ctx)
+	}
+
+	if entry, err = targetFS.StatFile(ctx, targetPath); err != nil {
+		if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) {
+			return moerr.NewInvalidInputNoCtxf("output directory %s does not exist", inputDirPath)
+		}
+		return
+	}
+
+	if !entry.IsDir {
+		return moerr.NewInvalidInputNoCtxf("output directory %s is not a directory", inputDirPath)
+	}
+
+	return nil
 }

Restore path validation for non-stage paths in the validate function. The
current implementation disables all validation to support stage paths, but this
reintroduces checks for regular file paths.

pkg/frontend/data_branch.go [319-338]

 func validate(stmt tree.Statement) error {
-	return nil
 	switch s := stmt.(type) {
 	case *tree.DataBranchDiff:
 		if s.OutputOpt != nil && len(s.OutputOpt.DirPath) != 0 {
+			// Skip validation for stage paths, as os.Stat will fail.
+			if strings.HasPrefix(s.OutputOpt.DirPath, stage.STAGE_PROTOCOL+"://") {
+				return nil
+			}
 			info, err := os.Stat(s.OutputOpt.DirPath)
 			if err != nil {
 				if os.IsNotExist(err) {
 					return moerr.NewInternalErrorNoCtxf("diff output directory does not exist: %s", s.OutputOpt.DirPath)
 				}
 				return moerr.NewInternalErrorNoCtxf("diff output directory check failed: %v", err)
 			}
 			if !info.IsDir() {
 				return moerr.NewInternalErrorNoCtxf("diff output file only accept a directory path")
 			}
 			return nil
 		}
 	}
 	return nil
 }

[Suggestion processed]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that completely removing path validation is a regression and proposes a sound fix to selectively validate only non-stage paths, improving robustness.

Medium
  • Update

@matrix-meow matrix-meow added size/L Denotes a PR that changes [500,999] lines and removed size/M Denotes a PR that changes [100,499] lines labels Dec 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement Review effort 3/5 size/L Denotes a PR that changes [500,999] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants