Skip to content

Commit 8ad88a2

Browse files
committed
copy: step txn unconditionally
Currently, we have an issue with COPY FROM command if it runs within the txn which had just been rolled back to a savepoint - we'd hit an assertion error in the KV layer. This commit fixes the oversight by stepping the txn, similar to what we do on the main query path. Additionally, this commit explicitly prohibits running COPY FROM via the internal executor (previously it could lead to undefined behavior or internal errors) and adds a test with savepoints for COPY TO (which works because it uses different execution machinery than COPY FROM). Release note (bug fix): Previously, CockroachDB could encounter an internal error when evaluating COPY FROM command in a transaction after it's been rolled back to a savepoint. The bug has been present since before 23.2 version and is now fixed.
1 parent 9e0395f commit 8ad88a2

File tree

5 files changed

+67
-42
lines changed

5 files changed

+67
-42
lines changed

pkg/sql/conn_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3219,6 +3219,12 @@ func (ex *connExecutor) execCopyIn(
32193219
// Disable the buffered writes for COPY since there is no benefit in this
32203220
// ability here.
32213221
ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */)
3222+
// Step the txn in case it had just been rolled back to a savepoint (if it
3223+
// wasn't, this is harmless). This also matches what we do unconditionally
3224+
// on the main query path.
3225+
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
3226+
return ex.makeErrEvent(err, cmd.ParsedStmt.AST)
3227+
}
32223228
txnOpt := copyTxnOpt{
32233229
txn: ex.state.mu.txn,
32243230
txnTimestamp: ex.state.sqlTimestamp,

pkg/sql/copy/copy_in_test.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -648,49 +648,51 @@ func TestCopyTransaction(t *testing.T) {
648648
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
649649
defer s.Stopper().Stop(context.Background())
650650

651-
if _, err := db.Exec(`
651+
_, err := db.Exec(`
652652
CREATE DATABASE d;
653653
SET DATABASE = d;
654654
CREATE TABLE t (
655655
i INT PRIMARY KEY
656656
);
657-
`); err != nil {
658-
t.Fatal(err)
659-
}
657+
`)
658+
require.NoError(t, err)
660659

661660
txn, err := db.Begin()
662-
if err != nil {
663-
t.Fatal(err)
664-
}
665-
666-
// Note that, at least with lib/pq, this doesn't actually send a Parse msg
667-
// (which we wouldn't support, as we don't support Copy-in in extended
668-
// protocol mode). lib/pq has magic for recognizing a Copy.
669-
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
670-
if err != nil {
671-
t.Fatal(err)
672-
}
661+
require.NoError(t, err)
673662

674-
const val = 2
663+
// Run COPY twice with the first one being rolled back via savepoints.
664+
for val, doSavepoint := range []bool{true, false} {
665+
func() {
666+
if doSavepoint {
667+
_, err = txn.Exec("SAVEPOINT s")
668+
require.NoError(t, err)
669+
defer func() {
670+
_, err = txn.Exec("ROLLBACK TO SAVEPOINT s")
671+
require.NoError(t, err)
672+
}()
673+
}
674+
// Note that, at least with lib/pq, this doesn't actually send a
675+
// Parse msg (which we wouldn't support, as we don't support Copy-in
676+
// in extended protocol mode). lib/pq has magic for recognizing a
677+
// Copy.
678+
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
679+
require.NoError(t, err)
675680

676-
_, err = stmt.Exec(val)
677-
if err != nil {
678-
t.Fatal(err)
679-
}
681+
_, err = stmt.Exec(val)
682+
require.NoError(t, err)
680683

681-
if err = stmt.Close(); err != nil {
682-
t.Fatal(err)
683-
}
684+
err = stmt.Close()
685+
require.NoError(t, err)
684686

685-
var i int
686-
if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil {
687-
t.Fatal(err)
688-
} else if i != val {
689-
t.Fatalf("expected 1, got %d", i)
690-
}
691-
if err := txn.Commit(); err != nil {
692-
t.Fatal(err)
687+
var i int
688+
err = txn.QueryRow("SELECT i FROM d.t").Scan(&i)
689+
require.NoError(t, err)
690+
if i != val {
691+
t.Fatalf("expected %d, got %d", val, i)
692+
}
693+
}()
693694
}
695+
require.NoError(t, txn.Commit())
694696
}
695697

696698
// TestCopyFromFKCheck verifies that foreign keys are checked during COPY.

pkg/sql/copy/copy_out_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,26 @@ func TestCopyOutTransaction(t *testing.T) {
6262
tx, err := conn.Begin(ctx)
6363
require.NoError(t, err)
6464

65-
_, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)")
66-
require.NoError(t, err)
65+
for val, doSavepoint := range []bool{true, false} {
66+
func() {
67+
if doSavepoint {
68+
_, err = tx.Exec(ctx, "SAVEPOINT s")
69+
require.NoError(t, err)
70+
defer func() {
71+
_, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s")
72+
require.NoError(t, err)
73+
}()
74+
}
6775

68-
var buf bytes.Buffer
69-
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
70-
require.NoError(t, err)
71-
require.Equal(t, "1\n", buf.String())
76+
_, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val)
77+
require.NoError(t, err)
7278

79+
var buf bytes.Buffer
80+
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
81+
require.NoError(t, err)
82+
require.Equal(t, fmt.Sprintf("%d\n", val), buf.String())
83+
}()
84+
}
7385
require.NoError(t, tx.Rollback(ctx))
7486
}
7587

pkg/sql/internal.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,15 +1471,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
14711471
return ex.commitSQLTransactionInternal(ctx)
14721472
}
14731473

1474-
// checkIfStmtIsAllowed returns an error if the internal executor is not bound
1475-
// with the outer-txn-related info but is used to run DDL statements within an
1476-
// outer txn.
1477-
// TODO (janexing): this will be deprecate soon since it's not a good idea
1478-
// to have `extraTxnState` to store the info from a outer txn.
1474+
// checkIfStmtIsAllowed returns an error if the internal executor cannot execute
1475+
// the given stmt.
14791476
func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error {
14801477
if stmt == nil {
14811478
return nil
14821479
}
1480+
if _, ok := stmt.(*tree.CopyFrom); ok {
1481+
// COPY FROM has special handling in the connExecutor, so we can't run
1482+
// it via the internal executor.
1483+
return errors.New("COPY cannot be run via the internal executor")
1484+
}
14831485
if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil {
14841486
return errors.New("DDL statement is disallowed if internal " +
14851487
"executor is not bound with txn metadata")

pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,6 @@ subtest end
298298
# Regression test for crashing with SHOW COMMIT TIMESTAMP.
299299
statement error this statement is disallowed
300300
SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true);
301+
302+
statement error COPY cannot be run via the internal executor
303+
SELECT crdb_internal.execute_internally('COPY t FROM STDIN');

0 commit comments

Comments
 (0)