@@ -124,29 +124,47 @@ def test_simple_producer(self):
124124 start_offset1 = self .current_offset (self .topic , 1 )
125125 producer = SimpleProducer (self .client )
126126
127- # Will go to partition 0
128- msg1 , msg2 , msg3 , msg4 , msg5 = [ str (uuid .uuid4 ()) for x in xrange (5 ) ]
127+ # Goes to first partition, randomly.
129128 resp = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
130129 self .assert_produce_response (resp , start_offset0 )
131130
132- # Will go to partition 1
131+ # Goes to the next partition, randomly.
133132 resp = producer .send_messages (self .topic , self .msg ("three" ))
134133 self .assert_produce_response (resp , start_offset1 )
135134
136135 self .assert_fetch_offset (0 , start_offset0 , [ self .msg ("one" ), self .msg ("two" ) ])
137136 self .assert_fetch_offset (1 , start_offset1 , [ self .msg ("three" ) ])
138137
139- # Will go to partition 0
138+ # Goes back to the first partition because there's only two partitions
140139 resp = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
141140 self .assert_produce_response (resp , start_offset0 + 2 )
142141 self .assert_fetch_offset (0 , start_offset0 , [ self .msg ("one" ), self .msg ("two" ), self .msg ("four" ), self .msg ("five" ) ])
143142
144143 producer .stop ()
145144
146145 @kafka_versions ("all" )
147- def test_round_robin_partitioner (self ):
148- msg1 , msg2 , msg3 , msg4 = [ str (uuid .uuid4 ()) for _ in range (4 ) ]
146+ def test_producer_random_order (self ):
147+ producer = SimpleProducer (self .client , random_start = True )
148+ resp1 = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
149+ resp2 = producer .send_messages (self .topic , self .msg ("three" ))
150+ resp3 = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
151+
152+ self .assertEqual (resp1 [0 ].partition , resp3 [0 ].partition )
153+ self .assertNotEqual (resp1 [0 ].partition , resp2 [0 ].partition )
154+
155+ @kafka_versions ("all" )
156+ def test_producer_ordered_start (self ):
157+ producer = SimpleProducer (self .client , random_start = False )
158+ resp1 = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
159+ resp2 = producer .send_messages (self .topic , self .msg ("three" ))
160+ resp3 = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
149161
162+ self .assertEqual (resp1 [0 ].partition , 0 )
163+ self .assertEqual (resp2 [0 ].partition , 1 )
164+ self .assertEqual (resp3 [0 ].partition , 0 )
165+
166+ @kafka_versions ("all" )
167+ def test_round_robin_partitioner (self ):
150168 start_offset0 = self .current_offset (self .topic , 0 )
151169 start_offset1 = self .current_offset (self .topic , 1 )
152170
0 commit comments