@@ -3,6 +3,7 @@ package io.aiven.guardian.kafka
33import io .aiven .guardian .kafka .models .ReducedConsumerRecord
44import org .apache .kafka .common .record .TimestampType
55import org .scalacheck .Gen
6+ import org .scalacheck .util .Buildable
67
78import scala .collection .mutable .ListBuffer
89import scala .concurrent .duration .FiniteDuration
@@ -113,6 +114,31 @@ object Generators {
113114 Gen .choose[Long ](head.timestamp, last.timestamp - 1 ).map(millis => FiniteDuration (millis, MILLISECONDS ))
114115 }
115116
117+ @ SuppressWarnings (
118+ Array (
119+ " scalafix:DisableSyntax.while"
120+ )
121+ )
122+ @ scala.annotation.nowarn(" msg=return statement uses an exception" )
123+ private def buildableOfCollCond [C <: Iterable [T ], T ](cond : C => Boolean , g : Gen [C ])(implicit
124+ evb : Buildable [T , C ]
125+ ): Gen [C ] =
126+ Gen .infiniteLazyList(g).map { ll =>
127+ val it = ll.iterator
128+ val bldr = evb.builder
129+ while (true ) {
130+ val result = bldr.result()
131+ if (cond(result)) {
132+ return result
133+ }
134+ bldr ++= it.next()
135+ }
136+ return bldr.result()
137+ }
138+
139+ private def listOfFillCond [T ](finishCondition : List [T ] => Boolean , g : => Gen [List [T ]]) =
140+ buildableOfCollCond[List [T ], T ](finishCondition, g)
141+
116142 def kafkaDateGen (min : Int = 2 ,
117143 max : Int = 100 ,
118144 padTimestampsMillis : Range = Range .inclusive(1 , 10 ),
@@ -121,7 +147,7 @@ object Generators {
121147 topic <- kafkaTopic
122148 records <- {
123149 val base = Generators .kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
124- condition.fold(base)(c => Gen . listOfFillCond(c, base))
150+ condition.fold(base)(c => listOfFillCond(c, base))
125151 }
126152 } yield records
127153
0 commit comments