2222import static io .trino .spi .StandardErrorCode .QUERY_REJECTED ;
2323import static java .util .Objects .requireNonNull ;
2424import static org .apache .bookkeeper .mledger .ManagedCursor .FindPositionConstraint .SearchAllAvailableEntries ;
25+ import static org .apache .bookkeeper .mledger .PositionFactory .EARLIEST ;
2526import static org .apache .pulsar .sql .presto .PulsarConnectorUtils .restoreNamespaceDelimiterIfNeeded ;
2627import com .fasterxml .jackson .databind .ObjectMapper ;
2728import com .google .common .annotations .VisibleForTesting ;
5051import org .apache .bookkeeper .mledger .ManagedLedgerConfig ;
5152import org .apache .bookkeeper .mledger .ManagedLedgerException ;
5253import org .apache .bookkeeper .mledger .ManagedLedgerFactory ;
54+ import org .apache .bookkeeper .mledger .Position ;
5355import org .apache .bookkeeper .mledger .ReadOnlyCursor ;
54- import org .apache .bookkeeper .mledger .impl .PositionImpl ;
5556import org .apache .commons .lang3 .exception .ExceptionUtils ;
5657import org .apache .pulsar .client .admin .PulsarAdmin ;
5758import org .apache .pulsar .client .admin .PulsarAdminException ;
@@ -271,7 +272,7 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
271272 try {
272273 readOnlyCursor = managedLedgerFactory .openReadOnlyCursor (
273274 topicNamePersistenceEncoding ,
274- PositionImpl . EARLIEST , managedLedgerConfig );
275+ EARLIEST , managedLedgerConfig );
275276
276277 long numEntries = readOnlyCursor .getNumberOfEntries ();
277278 if (numEntries <= 0 ) {
@@ -286,12 +287,12 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
286287 topicNamePersistenceEncoding ,
287288 numEntries );
288289
289- PositionImpl initialStartPosition ;
290+ Position initialStartPosition ;
290291 if (predicatePushdownInfo != null ) {
291292 numEntries = predicatePushdownInfo .getNumOfEntries ();
292293 initialStartPosition = predicatePushdownInfo .getStartPosition ();
293294 } else {
294- initialStartPosition = ( PositionImpl ) readOnlyCursor .getReadPosition ();
295+ initialStartPosition = readOnlyCursor .getReadPosition ();
295296 }
296297
297298
@@ -307,9 +308,9 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
307308 List <PulsarSplit > splits = new LinkedList <>();
308309 for (int i = 0 ; i < numSplits ; i ++) {
309310 long entriesForSplit = (remainder > i ) ? avgEntriesPerSplit + 1 : avgEntriesPerSplit ;
310- PositionImpl startPosition = ( PositionImpl ) readOnlyCursor .getReadPosition ();
311+ Position startPosition = readOnlyCursor .getReadPosition ();
311312 readOnlyCursor .skipEntries (Math .toIntExact (entriesForSplit ));
312- PositionImpl endPosition = ( PositionImpl ) readOnlyCursor .getReadPosition ();
313+ Position endPosition = readOnlyCursor .getReadPosition ();
313314
314315 PulsarSplit pulsarSplit = new PulsarSplit (i , this .connectorId ,
315316 restoreNamespaceDelimiterIfNeeded (tableHandle .getSchemaName (), pulsarConnectorConfig ),
@@ -341,11 +342,11 @@ Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
341342
342343 @ Data
343344 private static class PredicatePushdownInfo {
344- private PositionImpl startPosition ;
345- private PositionImpl endPosition ;
345+ private Position startPosition ;
346+ private Position endPosition ;
346347 private long numOfEntries ;
347348
348- private PredicatePushdownInfo (PositionImpl startPosition , PositionImpl endPosition , long numOfEntries ) {
349+ private PredicatePushdownInfo (Position startPosition , Position endPosition , long numOfEntries ) {
349350 this .startPosition = startPosition ;
350351 this .endPosition = endPosition ;
351352 this .numOfEntries = numOfEntries ;
@@ -363,7 +364,7 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
363364 try {
364365 readOnlyCursor = managedLedgerFactory .openReadOnlyCursor (
365366 topicNamePersistenceEncoding ,
366- PositionImpl . EARLIEST , managedLedgerConfig );
367+ EARLIEST , managedLedgerConfig );
367368
368369 if (tupleDomain .getDomains ().isPresent ()) {
369370 Domain domain = tupleDomain .getDomains ().get ().get (PulsarInternalColumn .PUBLISH_TIME
@@ -390,20 +391,20 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
390391 lowerBoundTs = block .getLong (0 , 0 ) / 1000 ;
391392 }
392393
393- PositionImpl overallStartPos ;
394+ Position overallStartPos ;
394395 if (lowerBoundTs == null ) {
395- overallStartPos = ( PositionImpl ) readOnlyCursor .getReadPosition ();
396+ overallStartPos = readOnlyCursor .getReadPosition ();
396397 } else {
397398 overallStartPos = findPosition (readOnlyCursor , lowerBoundTs );
398399 if (overallStartPos == null ) {
399- overallStartPos = ( PositionImpl ) readOnlyCursor .getReadPosition ();
400+ overallStartPos = readOnlyCursor .getReadPosition ();
400401 }
401402 }
402403
403- PositionImpl overallEndPos ;
404+ Position overallEndPos ;
404405 if (upperBoundTs == null ) {
405406 readOnlyCursor .skipEntries (Math .toIntExact (totalNumEntries ));
406- overallEndPos = ( PositionImpl ) readOnlyCursor .getReadPosition ();
407+ overallEndPos = readOnlyCursor .getReadPosition ();
407408 } else {
408409 overallEndPos = findPosition (readOnlyCursor , upperBoundTs );
409410 if (overallEndPos == null ) {
@@ -414,7 +415,7 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
414415 // Just use a close bound since presto can always filter out the extra entries even if
415416 // the bound
416417 // should be open or a mixture of open and closed
417- com .google .common .collect .Range <PositionImpl > posRange =
418+ com .google .common .collect .Range <Position > posRange =
418419 com .google .common .collect .Range .range (overallStartPos ,
419420 com .google .common .collect .BoundType .CLOSED ,
420421 overallEndPos , com .google .common .collect .BoundType .CLOSED );
@@ -437,10 +438,10 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
437438 }
438439 }
439440
440- private static PositionImpl findPosition (ReadOnlyCursor readOnlyCursor , long timestamp ) throws
441+ private static Position findPosition (ReadOnlyCursor readOnlyCursor , long timestamp ) throws
441442 ManagedLedgerException ,
442443 InterruptedException {
443- return ( PositionImpl ) readOnlyCursor .findNewestMatching (
444+ return readOnlyCursor .findNewestMatching (
444445 SearchAllAvailableEntries ,
445446 entry -> {
446447 try {
0 commit comments