Skip to content

Commit bb43a93

Browse files
authored
Fix UDTs registration ordering (#573)
* Make sure UDTs are registered before calling to a derived ExpressionEncoder() * Use frameless to derive ExpressionEncoders for UDTs * Derive CRS through TypedEncoders as well to be sure that CrsUDT is loaded
1 parent 92b7f7f commit bb43a93

File tree

8 files changed

+36
-11
lines changed

8 files changed

+36
-11
lines changed

core/src/main/scala/org/locationtech/rasterframes/encoders/StandardEncoders.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import org.locationtech.geomesa.spark.jts.encoders.SpatialEncoders
3333
import org.locationtech.rasterframes.model.{CellContext, LongExtent, TileContext, TileDataContext}
3434
import frameless.TypedEncoder
3535
import geotrellis.raster.mapalgebra.focal.{Kernel, Neighborhood, TargetCell}
36+
import org.locationtech.rasterframes.ref.RFRasterSource
37+
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
3638

3739
import java.net.URI
3840
import java.sql.Timestamp
@@ -45,14 +47,14 @@ trait StandardEncoders extends SpatialEncoders with TypedEncoders {
4547
implicit def optionalEncoder[T: TypedEncoder]: ExpressionEncoder[Option[T]] = typedExpressionEncoder[Option[T]]
4648

4749
implicit lazy val strMapEncoder: ExpressionEncoder[Map[String, String]] = ExpressionEncoder()
48-
implicit lazy val crsExpressionEncoder: ExpressionEncoder[CRS] = ExpressionEncoder()
4950
implicit lazy val projectedExtentEncoder: ExpressionEncoder[ProjectedExtent] = ExpressionEncoder()
5051
implicit lazy val temporalProjectedExtentEncoder: ExpressionEncoder[TemporalProjectedExtent] = ExpressionEncoder()
5152
implicit lazy val timestampEncoder: ExpressionEncoder[Timestamp] = ExpressionEncoder()
5253
implicit lazy val cellStatsEncoder: ExpressionEncoder[CellStatistics] = ExpressionEncoder()
5354
implicit lazy val cellHistEncoder: ExpressionEncoder[CellHistogram] = ExpressionEncoder()
5455
implicit lazy val localCellStatsEncoder: ExpressionEncoder[LocalCellStatistics] = ExpressionEncoder()
5556

57+
implicit lazy val crsExpressionEncoder: ExpressionEncoder[CRS] = typedExpressionEncoder
5658
implicit lazy val uriEncoder: ExpressionEncoder[URI] = typedExpressionEncoder[URI]
5759
implicit lazy val neighborhoodEncoder: ExpressionEncoder[Neighborhood] = typedExpressionEncoder[Neighborhood]
5860
implicit lazy val targetCellEncoder: ExpressionEncoder[TargetCell] = typedExpressionEncoder[TargetCell]
@@ -78,6 +80,11 @@ trait StandardEncoders extends SpatialEncoders with TypedEncoders {
7880

7981
implicit lazy val tileEncoder: ExpressionEncoder[Tile] = typedExpressionEncoder
8082
implicit def rasterEncoder[T <: CellGrid[Int]: TypedEncoder]: ExpressionEncoder[Raster[T]] = typedExpressionEncoder[Raster[T]]
83+
84+
// Intentionally not implicit, defined as implicit in the ProjectedRasterTile companion object
85+
lazy val projectedRasterTileEncoder: ExpressionEncoder[ProjectedRasterTile] = typedExpressionEncoder
86+
// Intentionally not implicit, defined as implicit in the RFRasterSource companion object
87+
lazy val rfRasterSourceEncoder: ExpressionEncoder[RFRasterSource] = typedExpressionEncoder
8188
}
8289

8390
object StandardEncoders extends StandardEncoders

core/src/main/scala/org/locationtech/rasterframes/encoders/TypedEncoders.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1010
import org.apache.spark.sql.catalyst.util.QuantileSummaries
1111
import org.apache.spark.sql.rf.{CrsUDT, RasterSourceUDT, TileUDT}
1212
import org.locationtech.jts.geom.Envelope
13+
import org.locationtech.rasterframes.ref.RFRasterSource
14+
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
1315
import org.locationtech.rasterframes.util.{FocalNeighborhood, FocalTargetCell, KryoSupport}
1416

1517
import java.net.URI
@@ -23,6 +25,8 @@ trait TypedEncoders {
2325
implicit val tileUDT = new TileUDT
2426
implicit val rasterSourceUDT = new RasterSourceUDT
2527

28+
implicit val crsTypedEncoder: TypedEncoder[CRS] = TypedEncoder.usingUserDefinedType[CRS]
29+
2630
implicit val cellTypeInjection: Injection[CellType, String] = Injection(_.toString, CellType.fromName)
2731
implicit val cellTypeTypedEncoder: TypedEncoder[CellType] = TypedEncoder.usingInjection[CellType, String]
2832

@@ -89,7 +93,20 @@ trait TypedEncoders {
8993
implicit val tileTypedEncoder: TypedEncoder[Tile] = TypedEncoder.usingUserDefinedType[Tile]
9094
implicit def rasterTileTypedEncoder[T <: CellGrid[Int]: TypedEncoder]: TypedEncoder[Raster[T]] = TypedEncoder.usingDerivation
9195

96+
// Derivation is done through frameless to trigger RasterSourceUDT load
97+
implicit val rfRasterSourceTypedEncoder: TypedEncoder[RFRasterSource] = TypedEncoder.usingUserDefinedType[RFRasterSource]
98+
9299
implicit val kernelTypedEncoder: TypedEncoder[Kernel] = TypedEncoder.usingDerivation
100+
101+
// Derivation is done through frameless to trigger the TileUDT and CrsUDT load
102+
implicit val projectedRasterTileTypedEncoder: TypedEncoder[ProjectedRasterTile] =
103+
ManualTypedEncoder.newInstance[ProjectedRasterTile](
104+
fields = List(
105+
RecordEncoderField(0, "tile", TypedEncoder[Tile]),
106+
RecordEncoderField(1, "extent", TypedEncoder[Extent]),
107+
RecordEncoderField(2, "crs", TypedEncoder[CRS])
108+
)
109+
)
93110
}
94111

95112
object TypedEncoders extends TypedEncoders

core/src/main/scala/org/locationtech/rasterframes/ref/RFRasterSource.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import geotrellis.vector.Extent
3131
import org.apache.hadoop.conf.Configuration
3232
import org.apache.spark.annotation.Experimental
3333
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
34-
import org.apache.spark.sql.rf.RasterSourceUDT
34+
import org.locationtech.rasterframes.encoders.StandardEncoders
3535
import org.locationtech.rasterframes.model.TileContext
3636
import org.locationtech.rasterframes.{NOMINAL_TILE_DIMS, rfConfig}
3737

@@ -100,10 +100,7 @@ object RFRasterSource extends LazyLogging {
100100

101101
def cacheStats = rsCache.stats()
102102

103-
implicit def rsEncoder: ExpressionEncoder[RFRasterSource] = {
104-
RasterSourceUDT // Makes sure UDT is registered first
105-
ExpressionEncoder()
106-
}
103+
implicit lazy val rsEncoder: ExpressionEncoder[RFRasterSource] = StandardEncoders.rfRasterSourceEncoder
107104

108105
def apply(source: URI): RFRasterSource =
109106
rsCache.get(

core/src/main/scala/org/locationtech/rasterframes/tiles/ProjectedRasterTile.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import geotrellis.vector.{Extent, ProjectedExtent}
2727
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2828
import org.locationtech.rasterframes.ref.ProjectedRasterLike
2929
import org.apache.spark.sql.catalyst.DefinedByConstructorParams
30+
import org.locationtech.rasterframes.encoders.StandardEncoders
3031

3132
/**
3233
* A Tile that's also like a ProjectedRaster, with delayed evaluation support.
@@ -58,5 +59,6 @@ object ProjectedRasterTile {
5859

5960
def unapply(prt: ProjectedRasterTile): Option[(Tile, Extent, CRS)] = Some((prt.tile, prt.extent, prt.crs))
6061

61-
implicit lazy val projectedRasterTileEncoder: ExpressionEncoder[ProjectedRasterTile] = ExpressionEncoder()
62+
implicit lazy val projectedRasterTileEncoder: ExpressionEncoder[ProjectedRasterTile] =
63+
StandardEncoders.projectedRasterTileEncoder
6264
}

core/src/test/scala/org/locationtech/rasterframes/expressions/SFCIndexerSpec.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ package org.locationtech.rasterframes.expressions
2424
import geotrellis.proj4.{CRS, LatLng, WebMercator}
2525
import geotrellis.raster.CellType
2626
import geotrellis.vector._
27-
import org.apache.spark.sql.Encoders
2827
import org.apache.spark.sql.jts.JTSTypes
2928
import org.locationtech.geomesa.curve.{XZ2SFC, Z2SFC}
3029
import org.locationtech.rasterframes._
@@ -151,7 +150,6 @@ class SFCIndexerSpec extends TestEnvironment with Inspectors {
151150
val tile = TestData.randomTile(2, 2, CellType.fromName("uint8"))
152151
val prts = testExtents.map(reproject(crs)).map(ProjectedRasterTile(tile, _, crs))
153152

154-
implicit val enc = Encoders.tuple(ProjectedRasterTile.projectedRasterTileEncoder, Encoders.scalaInt)
155153
// The `id` here is to deal with Spark auto projecting single columns dataframes and needing to provide an encoder
156154
val df = prts.zipWithIndex.toDF("proj_raster", "id")
157155
withClue("XZ2") {

docs/src/main/paradox/release-notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## 0.10.x
44

5+
### 0.10.1
6+
7+
* Fix UDTs registration ordering [#573](https://github.com/locationtech/rasterframes/pull/573)
8+
59
### 0.10.0
610

711
* Upgraded to Scala 2.12 , Spark 3.1.2, and GeoTrellis 3.6.0 (a subtantial accomplishment!)

pyrasterframes/src/main/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def dest_file(self, src_file):
140140
# to throw a `NotImplementedError: Can't perform this operation for unregistered loader type`
141141
pytest = 'pytest>=4.0.0,<5.0.0'
142142

143-
pyspark = 'pyspark==3.1.1'
143+
pyspark = 'pyspark==3.1.2'
144144
boto3 = 'boto3'
145145
deprecation = 'deprecation'
146146
descartes = 'descartes'

rf-notebook/src/main/docker/requirements-nb.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pyspark>=3.1
1+
pyspark==3.1.2
22
gdal==3.1.2
33
numpy
44
pandas

0 commit comments

Comments
 (0)