@@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
111111 del subscriptions ['C1' ]
112112 member_metadata = {}
113113 for member , topics in six .iteritems (subscriptions ):
114- member_metadata [member ] = build_metadata (topics , sticky_assignment [member ].partitions ())
114+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , sticky_assignment [member ].partitions ())
115115
116116 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
117117 expected_assignment = {
@@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
154154 }
155155 member_metadata = {}
156156 for member , topics in six .iteritems (subscriptions ):
157- member_metadata [member ] = build_metadata (topics , [])
157+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , [])
158158
159159 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
160160 expected_assignment = {
@@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
167167 del subscriptions ['C0' ]
168168 member_metadata = {}
169169 for member , topics in six .iteritems (subscriptions ):
170- member_metadata [member ] = build_metadata (topics , sticky_assignment [member ].partitions ())
170+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , sticky_assignment [member ].partitions ())
171171
172172 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
173173 expected_assignment = {
@@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
326326 }
327327 member_metadata = {}
328328 for member , topics in six .iteritems (subscriptions ):
329- member_metadata [member ] = build_metadata (
329+ member_metadata [member ] = StickyPartitionAssignor . _metadata (
330330 topics , assignment [member ].partitions () if member in assignment else []
331331 )
332332
@@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
338338 }
339339 member_metadata = {}
340340 for member , topics in six .iteritems (subscriptions ):
341- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
341+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
342342
343343 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
344344 verify_validity_and_balance (subscriptions , assignment )
@@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
367367 }
368368 member_metadata = {}
369369 for member , topics in six .iteritems (subscriptions ):
370- member_metadata [member ] = build_metadata (topics , sticky_assignment [member ].partitions ())
370+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , sticky_assignment [member ].partitions ())
371371
372372 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
373373 expected_assignment = {
@@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
382382 }
383383 member_metadata = {}
384384 for member , topics in six .iteritems (subscriptions ):
385- member_metadata [member ] = build_metadata (topics , sticky_assignment [member ].partitions ())
385+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , sticky_assignment [member ].partitions ())
386386
387387 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
388388 expected_assignment = {
@@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
413413 del subscriptions ['C10' ]
414414 member_metadata = {}
415415 for member , topics in six .iteritems (subscriptions ):
416- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
416+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
417417
418418 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
419419 verify_validity_and_balance (subscriptions , assignment )
@@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
435435 subscriptions ['C10' ] = {'t' }
436436 member_metadata = {}
437437 for member , topics in six .iteritems (subscriptions ):
438- member_metadata [member ] = build_metadata (
438+ member_metadata [member ] = StickyPartitionAssignor . _metadata (
439439 topics , assignment [member ].partitions () if member in assignment else []
440440 )
441441 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
462462 del subscriptions ['C5' ]
463463 member_metadata = {}
464464 for member , topics in six .iteritems (subscriptions ):
465- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
465+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
466466 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
467467 verify_validity_and_balance (subscriptions , assignment )
468468 assert StickyPartitionAssignor ._latest_partition_movements .are_sticky ()
@@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
488488
489489 member_metadata = {}
490490 for member , topics in six .iteritems (subscriptions ):
491- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
491+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
492492
493493 for i in range (50 ):
494494 member = 'C{}' .format (randint (1 , n_consumers ))
@@ -517,7 +517,7 @@ def test_new_subscription(mocker):
517517 subscriptions ['C0' ].add ('t1' )
518518 member_metadata = {}
519519 for member , topics in six .iteritems (subscriptions ):
520- member_metadata [member ] = build_metadata (topics , [])
520+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , [])
521521
522522 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
523523 verify_validity_and_balance (subscriptions , assignment )
@@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):
540540
541541 member_metadata = {}
542542 for member , topics in six .iteritems (subscriptions ):
543- member_metadata [member ] = build_metadata (topics , member_assignments [member ])
543+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , member_assignments [member ])
544544
545545 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
546546 verify_validity_and_balance (subscriptions , assignment )
@@ -570,7 +570,7 @@ def test_stickiness(mocker):
570570 del subscriptions ['C1' ]
571571 member_metadata = {}
572572 for member , topics in six .iteritems (subscriptions ):
573- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
573+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
574574
575575 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
576576 verify_validity_and_balance (subscriptions , assignment )
@@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
625625 }
626626 member_metadata = {}
627627 for member , topics in six .iteritems (subscriptions ):
628- member_metadata [member ] = build_metadata (topics , sticky_assignment [member ].partitions ())
628+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , sticky_assignment [member ].partitions ())
629629
630630 cluster = create_cluster (mocker , topics = {}, topics_partitions = {})
631631 sticky_assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
645645 member_metadata = {}
646646 for member , topics in six .iteritems (subscriptions ):
647647 # assume both C1 and C2 have partition 1 assigned to them in generation 1
648- member_metadata [member ] = build_metadata (topics , [TopicPartition ('t' , 0 ), TopicPartition ('t' , 0 )], 1 )
648+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , [TopicPartition ('t' , 0 ), TopicPartition ('t' , 0 )], 1 )
649649
650650 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
651651 verify_validity_and_balance (subscriptions , assignment )
@@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu
676676
677677 member_metadata = {}
678678 for member , topics in six .iteritems (subscriptions ):
679- member_metadata [member ] = build_metadata (topics , assignment [member ].partitions ())
679+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , assignment [member ].partitions ())
680680
681681 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
682682 verify_validity_and_balance (subscriptions , assignment )
@@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
687687 cluster = create_cluster (mocker , topics = {'t' }, topics_partitions = {0 , 1 , 2 , 3 , 4 , 5 })
688688
689689 member_metadata = {
690- 'C1' : build_metadata ({'t' }, []),
691- 'C2' : build_metadata ({'t' }, []),
692- 'C3' : build_metadata ({'t' }, []),
690+ 'C1' : StickyPartitionAssignor . _metadata ({'t' }, []),
691+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, []),
692+ 'C3' : StickyPartitionAssignor . _metadata ({'t' }, []),
693693 }
694694
695695 assignment1 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
699699 assert len (assignment1 ['C3' ].assignment [0 ][1 ]) == 2
700700
701701 member_metadata = {
702- 'C1' : build_metadata ({'t' }, assignment1 ['C1' ].partitions ()),
703- 'C2' : build_metadata ({'t' }, assignment1 ['C2' ].partitions ()),
702+ 'C1' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C1' ].partitions ()),
703+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C2' ].partitions ()),
704704 }
705705
706706 assignment2 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
712712 assert StickyPartitionAssignor ._latest_partition_movements .are_sticky ()
713713
714714 member_metadata = {
715- 'C2' : build_metadata ({'t' }, assignment2 ['C2' ].partitions (), 2 ),
716- 'C3' : build_metadata ({'t' }, assignment1 ['C3' ].partitions (), 1 ),
715+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, assignment2 ['C2' ].partitions (), 2 ),
716+ 'C3' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C3' ].partitions (), 1 ),
717717 }
718718
719719 assignment3 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
727727 cluster = create_cluster (mocker , topics = {'t' }, topics_partitions = {0 , 1 , 2 , 3 , 4 , 5 })
728728
729729 member_metadata = {
730- 'C1' : build_metadata ({'t' }, []),
731- 'C2' : build_metadata ({'t' }, []),
732- 'C3' : build_metadata ({'t' }, []),
730+ 'C1' : StickyPartitionAssignor . _metadata ({'t' }, []),
731+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, []),
732+ 'C3' : StickyPartitionAssignor . _metadata ({'t' }, []),
733733 }
734734
735735 assignment1 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
739739 assert len (assignment1 ['C3' ].assignment [0 ][1 ]) == 2
740740
741741 member_metadata = {
742- 'C2' : build_metadata ({'t' }, assignment1 ['C2' ].partitions (), 1 ),
742+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C2' ].partitions (), 1 ),
743743 }
744744
745745 assignment2 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
749749 assert StickyPartitionAssignor ._latest_partition_movements .are_sticky ()
750750
751751 member_metadata = {
752- 'C1' : build_metadata ({'t' }, assignment1 ['C1' ].partitions (), 1 ),
753- 'C2' : build_metadata ({'t' }, assignment2 ['C2' ].partitions (), 2 ),
754- 'C3' : build_metadata ({'t' }, assignment1 ['C3' ].partitions (), 1 ),
752+ 'C1' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C1' ].partitions (), 1 ),
753+ 'C2' : StickyPartitionAssignor . _metadata ({'t' }, assignment2 ['C2' ].partitions (), 2 ),
754+ 'C3' : StickyPartitionAssignor . _metadata ({'t' }, assignment1 ['C3' ].partitions (), 1 ),
755755 }
756756
757757 assignment3 = StickyPartitionAssignor .assign (cluster , member_metadata )
@@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
778778 }
779779 member_metadata = {}
780780 for member in six .iterkeys (member_assignments ):
781- member_metadata [member ] = build_metadata ({'t' }, member_assignments [member ], member_generations [member ])
781+ member_metadata [member ] = StickyPartitionAssignor . _metadata ({'t' }, member_assignments [member ], member_generations [member ])
782782
783783 assignment = StickyPartitionAssignor .assign (cluster , member_metadata )
784784 verify_validity_and_balance ({'C1' : {'t' }, 'C2' : {'t' }, 'C3' : {'t' }}, assignment )
@@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
788788def make_member_metadata (subscriptions ):
789789 member_metadata = {}
790790 for member , topics in six .iteritems (subscriptions ):
791- member_metadata [member ] = build_metadata (topics , [])
791+ member_metadata [member ] = StickyPartitionAssignor . _metadata (topics , [])
792792 return member_metadata
793793
794794
795- def build_metadata (topics , member_assignment_partitions , generation = - 1 ):
796- partitions_by_topic = defaultdict (list )
797- for topic_partition in member_assignment_partitions :
798- partitions_by_topic [topic_partition .topic ].append (topic_partition .partition )
799- data = StickyAssignorUserDataV1 (six .viewitems (partitions_by_topic ), generation )
800- user_data = data .encode ()
801- return ConsumerProtocolMemberMetadata (StickyPartitionAssignor .version , list (topics ), user_data )
802-
803-
804795def assert_assignment (result_assignment , expected_assignment ):
805796 assert result_assignment == expected_assignment
806797 assert set (result_assignment ) == set (expected_assignment )
0 commit comments