@@ -3,6 +3,7 @@ package io.aiven.guardian.kafka
33import io .aiven .guardian .kafka .models .ReducedConsumerRecord
44import org .apache .kafka .common .record .TimestampType
55import org .scalacheck .Gen
6+ import org .scalacheck .util .Buildable
67
78import scala .collection .mutable .ListBuffer
89import scala .concurrent .duration .FiniteDuration
@@ -113,6 +114,32 @@ object Generators {
113114 Gen .choose[Long ](head.timestamp, last.timestamp - 1 ).map(millis => FiniteDuration (millis, MILLISECONDS ))
114115 }
115116
117+ @ SuppressWarnings (
118+ Array (
119+ " scalafix:DisableSyntax.while" ,
120+ " scalafix:DisableSyntax.return"
121+ )
122+ )
123+ @ scala.annotation.nowarn(" msg=return statement uses an exception" )
124+ private def buildableOfCollCond [C <: Iterable [T ], T ](cond : C => Boolean , g : Gen [C ])(implicit
125+ evb : Buildable [T , C ]
126+ ): Gen [C ] =
127+ Gen .infiniteLazyList(g).map { ll =>
128+ val it = ll.iterator
129+ val bldr = evb.builder
130+ while (true ) {
131+ val result = bldr.result()
132+ if (cond(result)) {
133+ return result
134+ }
135+ bldr ++= it.next()
136+ }
137+ return bldr.result()
138+ }
139+
140+ private def listOfFillCond [T ](finishCondition : List [T ] => Boolean , g : => Gen [List [T ]]) =
141+ buildableOfCollCond[List [T ], T ](finishCondition, g)
142+
116143 def kafkaDateGen (min : Int = 2 ,
117144 max : Int = 100 ,
118145 padTimestampsMillis : Range = Range .inclusive(1 , 10 ),
@@ -121,7 +148,7 @@ object Generators {
121148 topic <- kafkaTopic
122149 records <- {
123150 val base = Generators .kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
124- condition.fold(base)(c => Gen . listOfFillCond(c, base))
151+ condition.fold(base)(c => listOfFillCond(c, base))
125152 }
126153 } yield records
127154
0 commit comments