Skip to content

Commit 147fd35

Browse files
authored
Merge pull request #157037 from yuzefovich/blathers/backport-release-25.4-156959
release-25.4: copy: step txn unconditionally
2 parents 5480846 + 8ad88a2 commit 147fd35

File tree

5 files changed

+67
-42
lines changed

5 files changed

+67
-42
lines changed

pkg/sql/conn_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3219,6 +3219,12 @@ func (ex *connExecutor) execCopyIn(
32193219
// Disable the buffered writes for COPY since there is no benefit in this
32203220
// ability here.
32213221
ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */)
3222+
// Step the txn in case it had just been rolled back to a savepoint (if it
3223+
// wasn't, this is harmless). This also matches what we do unconditionally
3224+
// on the main query path.
3225+
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
3226+
return ex.makeErrEvent(err, cmd.ParsedStmt.AST)
3227+
}
32223228
txnOpt := copyTxnOpt{
32233229
txn: ex.state.mu.txn,
32243230
txnTimestamp: ex.state.sqlTimestamp,

pkg/sql/copy/copy_in_test.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -648,49 +648,51 @@ func TestCopyTransaction(t *testing.T) {
648648
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
649649
defer s.Stopper().Stop(context.Background())
650650

651-
if _, err := db.Exec(`
651+
_, err := db.Exec(`
652652
CREATE DATABASE d;
653653
SET DATABASE = d;
654654
CREATE TABLE t (
655655
i INT PRIMARY KEY
656656
);
657-
`); err != nil {
658-
t.Fatal(err)
659-
}
657+
`)
658+
require.NoError(t, err)
660659

661660
txn, err := db.Begin()
662-
if err != nil {
663-
t.Fatal(err)
664-
}
665-
666-
// Note that, at least with lib/pq, this doesn't actually send a Parse msg
667-
// (which we wouldn't support, as we don't support Copy-in in extended
668-
// protocol mode). lib/pq has magic for recognizing a Copy.
669-
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
670-
if err != nil {
671-
t.Fatal(err)
672-
}
661+
require.NoError(t, err)
673662

674-
const val = 2
663+
// Run COPY twice with the first one being rolled back via savepoints.
664+
for val, doSavepoint := range []bool{true, false} {
665+
func() {
666+
if doSavepoint {
667+
_, err = txn.Exec("SAVEPOINT s")
668+
require.NoError(t, err)
669+
defer func() {
670+
_, err = txn.Exec("ROLLBACK TO SAVEPOINT s")
671+
require.NoError(t, err)
672+
}()
673+
}
674+
// Note that, at least with lib/pq, this doesn't actually send a
675+
// Parse msg (which we wouldn't support, as we don't support Copy-in
676+
// in extended protocol mode). lib/pq has magic for recognizing a
677+
// Copy.
678+
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
679+
require.NoError(t, err)
675680

676-
_, err = stmt.Exec(val)
677-
if err != nil {
678-
t.Fatal(err)
679-
}
681+
_, err = stmt.Exec(val)
682+
require.NoError(t, err)
680683

681-
if err = stmt.Close(); err != nil {
682-
t.Fatal(err)
683-
}
684+
err = stmt.Close()
685+
require.NoError(t, err)
684686

685-
var i int
686-
if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil {
687-
t.Fatal(err)
688-
} else if i != val {
689-
t.Fatalf("expected 1, got %d", i)
690-
}
691-
if err := txn.Commit(); err != nil {
692-
t.Fatal(err)
687+
var i int
688+
err = txn.QueryRow("SELECT i FROM d.t").Scan(&i)
689+
require.NoError(t, err)
690+
if i != val {
691+
t.Fatalf("expected %d, got %d", val, i)
692+
}
693+
}()
693694
}
695+
require.NoError(t, txn.Commit())
694696
}
695697

696698
// TestCopyFromFKCheck verifies that foreign keys are checked during COPY.

pkg/sql/copy/copy_out_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,26 @@ func TestCopyOutTransaction(t *testing.T) {
6262
tx, err := conn.Begin(ctx)
6363
require.NoError(t, err)
6464

65-
_, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)")
66-
require.NoError(t, err)
65+
for val, doSavepoint := range []bool{true, false} {
66+
func() {
67+
if doSavepoint {
68+
_, err = tx.Exec(ctx, "SAVEPOINT s")
69+
require.NoError(t, err)
70+
defer func() {
71+
_, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s")
72+
require.NoError(t, err)
73+
}()
74+
}
6775

68-
var buf bytes.Buffer
69-
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
70-
require.NoError(t, err)
71-
require.Equal(t, "1\n", buf.String())
76+
_, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val)
77+
require.NoError(t, err)
7278

79+
var buf bytes.Buffer
80+
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
81+
require.NoError(t, err)
82+
require.Equal(t, fmt.Sprintf("%d\n", val), buf.String())
83+
}()
84+
}
7385
require.NoError(t, tx.Rollback(ctx))
7486
}
7587

pkg/sql/internal.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,15 +1471,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
14711471
return ex.commitSQLTransactionInternal(ctx)
14721472
}
14731473

1474-
// checkIfStmtIsAllowed returns an error if the internal executor is not bound
1475-
// with the outer-txn-related info but is used to run DDL statements within an
1476-
// outer txn.
1477-
// TODO (janexing): this will be deprecate soon since it's not a good idea
1478-
// to have `extraTxnState` to store the info from a outer txn.
1474+
// checkIfStmtIsAllowed returns an error if the internal executor cannot execute
1475+
// the given stmt.
14791476
func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error {
14801477
if stmt == nil {
14811478
return nil
14821479
}
1480+
if _, ok := stmt.(*tree.CopyFrom); ok {
1481+
// COPY FROM has special handling in the connExecutor, so we can't run
1482+
// it via the internal executor.
1483+
return errors.New("COPY cannot be run via the internal executor")
1484+
}
14831485
if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil {
14841486
return errors.New("DDL statement is disallowed if internal " +
14851487
"executor is not bound with txn metadata")

pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,6 @@ subtest end
298298
# Regression test for crashing with SHOW COMMIT TIMESTAMP.
299299
statement error this statement is disallowed
300300
SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true);
301+
302+
statement error COPY cannot be run via the internal executor
303+
SELECT crdb_internal.execute_internally('COPY t FROM STDIN');

0 commit comments

Comments
 (0)