@@ -3,6 +3,7 @@ package io.aiven.guardian.kafka
33import io .aiven .guardian .kafka .models .ReducedConsumerRecord
44import org .apache .kafka .common .record .TimestampType
55import org .scalacheck .Gen
6+ import org .scalacheck .util .Buildable
67
78import scala .collection .mutable .ListBuffer
89import scala .concurrent .duration .FiniteDuration
@@ -113,6 +114,25 @@ object Generators {
113114 Gen .choose[Long ](head.timestamp, last.timestamp - 1 ).map(millis => FiniteDuration (millis, MILLISECONDS ))
114115 }
115116
117+ @ SuppressWarnings (
118+ Array (
119+ " scalafix:DisableSyntax.while"
120+ )
121+ )
122+ private def buildableOfCollCond [C <: Iterable [T ], T ](cond : C => Boolean , g : Gen [C ])(implicit
123+ evb : Buildable [T , C ]
124+ ): Gen [C ] =
125+ Gen .infiniteLazyList(g).map { ll =>
126+ val it = ll.iterator
127+ val bldr = evb.builder
128+ while (! cond(bldr.result()))
129+ bldr ++= it.next()
130+ bldr.result() // sub-optimal: is called twice for the same result, can be improved!
131+ }
132+
133+ private def listOfFillCond [T ](finishCondition : List [T ] => Boolean , g : => Gen [List [T ]]) =
134+ buildableOfCollCond[List [T ], T ](finishCondition, g)
135+
116136 def kafkaDateGen (min : Int = 2 ,
117137 max : Int = 100 ,
118138 padTimestampsMillis : Range = Range .inclusive(1 , 10 ),
@@ -121,7 +141,7 @@ object Generators {
121141 topic <- kafkaTopic
122142 records <- {
123143 val base = Generators .kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
124- condition.fold(base)(c => Gen . listOfFillCond(c, base))
144+ condition.fold(base)(c => listOfFillCond(c, base))
125145 }
126146 } yield records
127147
0 commit comments