@@ -24,7 +24,6 @@ import (
2424 "github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2525 "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
2626 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
27- "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2827 "github.com/cockroachdb/cockroach/pkg/testutils"
2928 "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3029 "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -166,69 +165,63 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
166165 defer leaktest .AfterTest (t )()
167166 defer log .Scope (t ).Close (t )
168167
169- s , stopServer := makeServer (t )
170- defer stopServer ()
168+ testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
169+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
170+ sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
171171
172- knobs := s .TestingKnobs .
173- DistSQL .(* execinfra.TestingKnobs ).
174- Changefeed .(* TestingKnobs )
175- knobs .WrapSink = func (s Sink , _ jobspb.JobID ) Sink {
176- if _ , ok := s .(* externalConnectionKafkaSink ); ok {
177- return s
172+ const apiSecret = "bar"
173+ const certSecret = "Zm9v"
174+ for _ , tc := range []struct {
175+ name string
176+ uri string
177+ expectedSinkURI string
178+ expectedDescription string
179+ }{
180+ {
181+ name : "api_secret" ,
182+ uri : fmt .Sprintf ("confluent-cloud://nope?api_key=fee&api_secret=%s" , apiSecret ),
183+ },
184+ {
185+ name : "sasl_password" ,
186+ uri : fmt .Sprintf ("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa" , apiSecret ),
187+ },
188+ {
189+ name : "ca_cert" ,
190+ uri : fmt .Sprintf ("kafka://nope?ca_cert=%s&tls_enabled=true" , certSecret ),
191+ },
192+ {
193+ name : "shared_access_key" ,
194+ uri : fmt .Sprintf ("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain" , apiSecret ),
195+ },
196+ } {
197+ t .Run (tc .name , func (t * testing.T ) {
198+ foo := feed (t , f , fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri ),
199+ optOutOfMetamorphicEnrichedEnvelope {reason : "compares text of changefeed statement" })
200+ defer closeFeed (t , foo )
201+
202+ efoo , ok := foo .(cdctest.EnterpriseTestFeed )
203+ require .True (t , ok )
204+ jobID := efoo .JobID ()
205+
206+ var sinkURI , description string
207+ sqlDB .QueryRow (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]" , jobID ).Scan (& sinkURI , & description )
208+ replacer := strings .NewReplacer (apiSecret , "redacted" , certSecret , "redacted" )
209+ expectedSinkURI := replacer .Replace (tc .uri )
210+ expectedDescription := replacer .Replace (fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri ))
211+ require .Equal (t , expectedSinkURI , sinkURI )
212+ require .Equal (t , expectedDescription , description )
213+ })
178214 }
179- return & externalConnectionKafkaSink {sink : s , ignoreDialError : true }
180- }
181-
182- sqlDB := sqlutils .MakeSQLRunner (s .DB )
183- sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
184-
185- const apiSecret = "bar"
186- const certSecret = "Zm9v"
187- for _ , tc := range []struct {
188- name string
189- uri string
190- expectedSinkURI string
191- expectedDescription string
192- }{
193- {
194- name : "api_secret" ,
195- uri : fmt .Sprintf ("confluent-cloud://nope?api_key=fee&api_secret=%s" , apiSecret ),
196- },
197- {
198- name : "sasl_password" ,
199- uri : fmt .Sprintf ("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa" , apiSecret ),
200- },
201- {
202- name : "ca_cert" ,
203- uri : fmt .Sprintf ("kafka://nope?ca_cert=%s&tls_enabled=true" , certSecret ),
204- },
205- {
206- name : "shared_access_key" ,
207- uri : fmt .Sprintf ("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain" , apiSecret ),
208- },
209- } {
210- t .Run (tc .name , func (t * testing.T ) {
211- createStmt := fmt .Sprintf (`CREATE CHANGEFEED FOR TABLE foo INTO '%s'` , tc .uri )
212- var jobID jobspb.JobID
213- sqlDB .QueryRow (t , createStmt ).Scan (& jobID )
214- var sinkURI , description string
215- sqlDB .QueryRow (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]" , jobID ).Scan (& sinkURI , & description )
216- replacer := strings .NewReplacer (apiSecret , "redacted" , certSecret , "redacted" )
217- expectedSinkURI := replacer .Replace (tc .uri )
218- expectedDescription := replacer .Replace (createStmt )
219- require .Equal (t , expectedSinkURI , sinkURI )
220- require .Equal (t , expectedDescription , description )
215+ t .Run ("jobs" , func (t * testing.T ) {
216+ queryStr := sqlDB .QueryStr (t , "SELECT description from [SHOW JOBS]" )
217+ require .NotContains (t , queryStr , apiSecret )
218+ require .NotContains (t , queryStr , certSecret )
219+ queryStr = sqlDB .QueryStr (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]" )
220+ require .NotContains (t , queryStr , apiSecret )
221+ require .NotContains (t , queryStr , certSecret )
221222 })
222223 }
223-
224- t .Run ("jobs" , func (t * testing.T ) {
225- queryStr := sqlDB .QueryStr (t , "SELECT description from [SHOW JOBS]" )
226- require .NotContains (t , queryStr , apiSecret )
227- require .NotContains (t , queryStr , certSecret )
228- queryStr = sqlDB .QueryStr (t , "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]" )
229- require .NotContains (t , queryStr , apiSecret )
230- require .NotContains (t , queryStr , certSecret )
231- })
224+ cdcTest (t , testFn , feedTestForceSink ("kafka" ), feedTestNoExternalConnection )
232225}
233226
234227func TestShowChangefeedJobs (t * testing.T ) {
0 commit comments