@@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
1616async fn test_plain ( ) {
1717 maybe_start_logging ( ) ;
1818
19- let connection = maybe_skip_kafka_integration ! ( ) ;
20- ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
19+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
20+ ClientBuilder :: new ( test_cfg. bootstrap_brokers )
21+ . build ( )
22+ . await
23+ . unwrap ( ) ;
2124}
2225
2326#[ tokio:: test]
2427async fn test_topic_crud ( ) {
2528 maybe_start_logging ( ) ;
2629
27- let connection = maybe_skip_kafka_integration ! ( ) ;
28- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
30+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
31+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
32+ . build ( )
33+ . await
34+ . unwrap ( ) ;
2935 let controller_client = client. controller_client ( ) . unwrap ( ) ;
3036 let topics = client. list_topics ( ) . await . unwrap ( ) ;
3137
@@ -77,10 +83,13 @@ async fn test_topic_crud() {
7783async fn test_partition_client ( ) {
7884 maybe_start_logging ( ) ;
7985
80- let connection = maybe_skip_kafka_integration ! ( ) ;
86+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
8187 let topic_name = random_topic_name ( ) ;
8288
83- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
89+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
90+ . build ( )
91+ . await
92+ . unwrap ( ) ;
8493
8594 let controller_client = client. controller_client ( ) . unwrap ( ) ;
8695 controller_client
@@ -100,10 +109,10 @@ async fn test_partition_client() {
100109async fn test_non_existing_partition ( ) {
101110 maybe_start_logging ( ) ;
102111
103- let connection = maybe_skip_kafka_integration ! ( ) ;
112+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
104113 let topic_name = random_topic_name ( ) ;
105114
106- let client = ClientBuilder :: new ( connection ) . build ( ) . await . unwrap ( ) ;
115+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers ) . build ( ) . await . unwrap ( ) ;
107116
108117 // do NOT create the topic
109118
@@ -167,8 +176,8 @@ async fn test_tls() {
167176 . with_single_cert ( vec ! [ producer_root] , private_key)
168177 . unwrap ( ) ;
169178
170- let connection = maybe_skip_kafka_integration ! ( ) ;
171- ClientBuilder :: new ( connection )
179+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
180+ ClientBuilder :: new ( test_cfg . bootstrap_brokers )
172181 . tls_config ( Arc :: new ( config) )
173182 . build ( )
174183 . await
@@ -180,14 +189,11 @@ async fn test_tls() {
180189async fn test_socks5 ( ) {
181190 maybe_start_logging ( ) ;
182191
183- // e.g. "my-connection-kafka-bootstrap:9092"
184- let connection = maybe_skip_kafka_integration ! ( ) ;
185- // e.g. "localhost:1080"
186- let proxy = maybe_skip_SOCKS_PROXY ! ( ) ;
192+ let test_cfg = maybe_skip_kafka_integration ! ( socks5) ;
187193 let topic_name = random_topic_name ( ) ;
188194
189- let client = ClientBuilder :: new ( connection )
190- . socks5_proxy ( proxy )
195+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers )
196+ . socks5_proxy ( test_cfg . socks5_proxy . unwrap ( ) )
191197 . build ( )
192198 . await
193199 . unwrap ( ) ;
@@ -222,11 +228,14 @@ async fn test_socks5() {
222228async fn test_produce_empty ( ) {
223229 maybe_start_logging ( ) ;
224230
225- let connection = maybe_skip_kafka_integration ! ( ) ;
231+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
226232 let topic_name = random_topic_name ( ) ;
227233 let n_partitions = 2 ;
228234
229- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
235+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
236+ . build ( )
237+ . await
238+ . unwrap ( ) ;
230239 let controller_client = client. controller_client ( ) . unwrap ( ) ;
231240 controller_client
232241 . create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -247,11 +256,14 @@ async fn test_produce_empty() {
247256async fn test_consume_empty ( ) {
248257 maybe_start_logging ( ) ;
249258
250- let connection = maybe_skip_kafka_integration ! ( ) ;
259+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
251260 let topic_name = random_topic_name ( ) ;
252261 let n_partitions = 2 ;
253262
254- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
263+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
264+ . build ( )
265+ . await
266+ . unwrap ( ) ;
255267 let controller_client = client. controller_client ( ) . unwrap ( ) ;
256268 controller_client
257269 . create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -274,11 +286,14 @@ async fn test_consume_empty() {
274286async fn test_consume_offset_out_of_range ( ) {
275287 maybe_start_logging ( ) ;
276288
277- let connection = maybe_skip_kafka_integration ! ( ) ;
289+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
278290 let topic_name = random_topic_name ( ) ;
279291 let n_partitions = 2 ;
280292
281- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
293+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
294+ . build ( )
295+ . await
296+ . unwrap ( ) ;
282297 let controller_client = client. controller_client ( ) . unwrap ( ) ;
283298 controller_client
284299 . create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -314,11 +329,11 @@ async fn test_consume_offset_out_of_range() {
314329async fn test_get_offset ( ) {
315330 maybe_start_logging ( ) ;
316331
317- let connection = maybe_skip_kafka_integration ! ( ) ;
332+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
318333 let topic_name = random_topic_name ( ) ;
319334 let n_partitions = 1 ;
320335
321- let client = ClientBuilder :: new ( connection . clone ( ) )
336+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers . clone ( ) )
322337 . build ( )
323338 . await
324339 . unwrap ( ) ;
@@ -382,10 +397,13 @@ async fn test_get_offset() {
382397async fn test_produce_consume_size_cutoff ( ) {
383398 maybe_start_logging ( ) ;
384399
385- let connection = maybe_skip_kafka_integration ! ( ) ;
400+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
386401 let topic_name = random_topic_name ( ) ;
387402
388- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
403+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
404+ . build ( )
405+ . await
406+ . unwrap ( ) ;
389407 let controller_client = client. controller_client ( ) . unwrap ( ) ;
390408 controller_client
391409 . create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -460,10 +478,13 @@ async fn test_produce_consume_size_cutoff() {
460478async fn test_consume_midbatch ( ) {
461479 maybe_start_logging ( ) ;
462480
463- let connection = maybe_skip_kafka_integration ! ( ) ;
481+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
464482 let topic_name = random_topic_name ( ) ;
465483
466- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
484+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
485+ . build ( )
486+ . await
487+ . unwrap ( ) ;
467488 let controller_client = client. controller_client ( ) . unwrap ( ) ;
468489 controller_client
469490 . create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -508,10 +529,13 @@ async fn test_consume_midbatch() {
508529async fn test_delete_records ( ) {
509530 maybe_start_logging ( ) ;
510531
511- let connection = maybe_skip_kafka_integration ! ( ) ;
532+ let test_cfg = maybe_skip_kafka_integration ! ( delete ) ;
512533 let topic_name = random_topic_name ( ) ;
513534
514- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
535+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
536+ . build ( )
537+ . await
538+ . unwrap ( ) ;
515539 let controller_client = client. controller_client ( ) . unwrap ( ) ;
516540 controller_client
517541 . create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -555,7 +579,10 @@ async fn test_delete_records() {
555579 let offset_4 = offsets[ 0 ] ;
556580
557581 // delete from the middle of the 2nd batch
558- maybe_skip_delete ! ( partition_client, offset_3) ;
582+ partition_client
583+ . delete_records ( offset_3, 1_000 )
584+ . await
585+ . unwrap ( ) ;
559586
560587 // fetching data before the record fails
561588 let err = partition_client
0 commit comments