Skip to content

Commit 3a91af6

Browse files
craig[bot]kev-caospilchen
committed
155292: logical: external conn should not require db name r=msbutler a=kev-cao LDR currently erroneously requires the db name in the external connection with setting up a bidirectional connection. This was determined to be caused by the fact that the privilege lookup on the source cluster was using unqualified table names during the reverse stream setup. This commit fixes the issue by using fully qualified table names. Fixes: #152395 Release note: LDR no longer requires the database name to be specified in the external connection URI when setting up a bidirectional stream. 155537: sql/opt: fix RLS metadata clearing r=spilchen a=spilchen This commit addresses two issues in the handling of RLS metadata in the sql/opt package: - The RLS metadata was not being properly cleared after calling the Clear() function. This function is only used in test. - Emitting RLS information during explain could theoretically result in a nil pointer dereference. While this path cannot be reached with a nil pointer under current conditions, the code was caught by an LLM. This change adds a defensive safeguard. Closes #153192 Release note: none Epic: none Co-authored-by: Kevin Cao <39608887+kev-cao@users.noreply.github.com> Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com>
3 parents 3dbfaf3 + 722ef0d + dc5414d commit 3a91af6

File tree

5 files changed

+71
-16
lines changed

5 files changed

+71
-16
lines changed

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ SELECT
6565
id AS job_id,
6666
crdb_internal.pb_to_json(
6767
'cockroach.sql.jobs.jobspb.Payload',
68-
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69-
FROM crdb_internal.system_jobs
68+
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
69+
FROM crdb_internal.system_jobs
7070
WHERE job_type = 'LOGICAL REPLICATION'
7171
) AS t
7272
WHERE t.parent_id = $1
@@ -359,10 +359,12 @@ func (r *ResolvedDestObjects) TargetDescription() string {
359359
return targetDescription
360360
}
361361

362+
// TargetTableNames returns the fully qualified names of the resolved target
363+
// tables.
362364
func (r *ResolvedDestObjects) TargetTableNames() []string {
363-
var targetTableNames []string
365+
targetTableNames := make([]string, len(r.TableNames))
364366
for i := range r.TableNames {
365-
targetTableNames = append(targetTableNames, r.TableNames[i].Table())
367+
targetTableNames[i] = r.TableNames[i].FQString()
366368
}
367369
return targetTableNames
368370
}

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,12 +1727,12 @@ func GetReverseJobID(
17271727
var jobID jobspb.JobID
17281728
testutils.SucceedsSoon(t, func() error {
17291729
err := db.DB.QueryRowContext(ctx, `
1730-
SELECT id
1731-
FROM system.jobs
1732-
WHERE job_type = 'LOGICAL REPLICATION'
1730+
SELECT id
1731+
FROM system.jobs
1732+
WHERE job_type = 'LOGICAL REPLICATION'
17331733
AND id != $1
17341734
AND created > $2
1735-
ORDER BY created DESC
1735+
ORDER BY created DESC
17361736
LIMIT 1`,
17371737
parentID, created).Scan(&jobID)
17381738
if err != nil {
@@ -2905,3 +2905,46 @@ func TestGetWriterType(t *testing.T) {
29052905
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
29062906
})
29072907
}
2908+
2909+
func TestLogicalReplicationExternalConnWithoutDBName(t *testing.T) {
2910+
defer leaktest.AfterTest(t)()
2911+
defer log.Scope(t).Close(t)
2912+
2913+
ctx := context.Background()
2914+
2915+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
2916+
defer server.Stopper().Stop(ctx)
2917+
2918+
dbA.Exec(t, "CREATE TABLE a.public.foo (x INT PRIMARY KEY)")
2919+
dbA.Exec(t, "INSERT INTO a.public.foo SELECT * FROM generate_series(1, 10)")
2920+
dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'")
2921+
dbA.Exec(t, "GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public.foo TO userA")
2922+
dbAURL := replicationtestutils.GetExternalConnectionURI(
2923+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"),
2924+
)
2925+
2926+
dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'")
2927+
dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB")
2928+
dbBURL := replicationtestutils.GetExternalConnectionURI(
2929+
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"),
2930+
)
2931+
2932+
dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn(
2933+
t,
2934+
serverutils.DBName("b"),
2935+
serverutils.ClientCerts(false),
2936+
serverutils.UserPassword("userB", "123"),
2937+
))
2938+
2939+
var jobID jobspb.JobID
2940+
dbBAsUser.QueryRow(
2941+
t,
2942+
"CREATE LOGICALLY REPLICATED TABLE b.public.foo FROM TABLE a.public.foo ON $1 WITH BIDIRECTIONAL ON $2",
2943+
dbAURL.String(),
2944+
dbBURL.String(),
2945+
).Scan(&jobID)
2946+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID)
2947+
2948+
reverseJobID := GetReverseJobID(ctx, t, dbA, jobID)
2949+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID)
2950+
}

pkg/sql/opt/exec/explain/emit.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,14 +1509,16 @@ func (e *emitter) emitPolicies(ob *OutputBuilder, table cat.Table, n *Node) {
15091509
ob.AddField("policies", "row-level security enabled, no policies applied.")
15101510
} else {
15111511
var sb strings.Builder
1512-
policies := table.Policies()
1513-
for _, grp := range [][]cat.Policy{policies.Permissive, policies.Restrictive} {
1514-
for _, policy := range grp {
1515-
if applied.Policies.Contains(policy.ID) {
1516-
if sb.Len() > 0 {
1517-
sb.WriteString(", ")
1512+
if table != nil {
1513+
policies := table.Policies()
1514+
for _, grp := range [][]cat.Policy{policies.Permissive, policies.Restrictive} {
1515+
for _, policy := range grp {
1516+
if applied.Policies.Contains(policy.ID) {
1517+
if sb.Len() > 0 {
1518+
sb.WriteString(", ")
1519+
}
1520+
sb.WriteString(policy.Name.Normalize())
15181521
}
1519-
sb.WriteString(policy.Name.Normalize())
15201522
}
15211523
}
15221524
}

pkg/sql/opt/memo/memo_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,12 @@ func TestMemoIsStale(t *testing.T) {
672672
stale()
673673
evalCtx.SessionData().UserProto = oldUser
674674
notStale()
675+
676+
// User changes (after RLS was reinitialized)
675677
o.Memo().Metadata().ClearRLSEnabled()
678+
evalCtx.SessionData().UserProto = newUser
679+
notStale()
680+
evalCtx.SessionData().UserProto = oldUser
676681
notStale()
677682

678683
// Stale row_security.

pkg/sql/opt/row_level_security.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ func (r *RowLevelSecurityMeta) MaybeInit(user username.SQLUsername, hasAdminRole
4848

4949
// Clear unsets the initialized property. This is used as a test helper.
5050
func (r *RowLevelSecurityMeta) Clear() {
51-
r = &RowLevelSecurityMeta{}
51+
if r == nil {
52+
return
53+
}
54+
*r = RowLevelSecurityMeta{}
5255
}
5356

5457
// AddTableUse indicates that an RLS-enabled table was encountered while

0 commit comments

Comments
 (0)