@@ -24,12 +24,13 @@ import astraea.spark.rasterframes.expressions.ExplodeTileExpression
2424import astraea .spark .rasterframes .functions .{CellCountAggregateFunction , CellMeanAggregateFunction }
2525import astraea .spark .rasterframes .stats .{CellHistogram , CellStatistics }
2626import astraea .spark .rasterframes .{functions ⇒ F }
27- import com .vividsolutions .jts .geom .Envelope
27+ import com .vividsolutions .jts .geom .{Envelope , Geometry }
28+ import geotrellis .proj4 .CRS
2829import geotrellis .raster .mapalgebra .local .LocalTileBinaryOp
2930import geotrellis .raster .{CellType , Tile }
3031import org .apache .spark .annotation .Experimental
3132import org .apache .spark .sql ._
32- import org .apache .spark .sql .functions .udf
33+ import org .apache .spark .sql .functions .{ lit , udf }
3334import org .apache .spark .sql .rf ._
3435
3536import scala .reflect .runtime .universe ._
@@ -60,7 +61,7 @@ trait RasterFunctions {
6061 def tileDimensions (col : Column ): Column = expressions.DimensionsExpression (col.expr).asColumn
6162
6263 /** Extracts the bounding box of a geometry as a JTS envelope. */
63- def box2D (col : Column ): TypedColumn [Any , Envelope ] = expressions.Box2DExpression (col.expr).asColumn.as[Envelope ]
64+ def envelope (col : Column ): TypedColumn [Any , Envelope ] = expressions.EnvelopeExpression (col.expr).asColumn.as[Envelope ]
6465
6566 /** Flattens Tile into an array. A numeric type parameter is required. */
6667 @ Experimental
@@ -88,6 +89,10 @@ trait RasterFunctions {
8889 def convertCellType (col : Column , cellType : CellType ): TypedColumn [Any , Tile ] =
8990 udf[Tile , Tile ](F .convertCellType(cellType)).apply(col).as[Tile ]
9091
92+ /** Change the Tile's cell type */
93+ def convertCellType (col : Column , cellTypeName : String ): TypedColumn [Any , Tile ] =
94+ udf[Tile , Tile ](F .convertCellType(cellTypeName)).apply(col).as[Tile ]
95+
9196 /** Assign a `NoData` value to the Tiles. */
9297 def withNoData (col : Column , nodata : Double ) = withAlias(" withNoData" , col)(
9398 udf[Tile , Tile ](F .withNoData(nodata)).apply(col)
@@ -209,36 +214,214 @@ trait RasterFunctions {
209214 udf(F .localAdd).apply(left, right)
210215 ).as[Tile ]
211216
217+ /** Cellwise addition of a scalar to a tile. */
218+ def localAddScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
219+ val f = value match {
220+ case i : Int => F .localAddScalarInt(_ : Tile , i)
221+ case d : Double => F .localAddScalar(_ : Tile , d)
222+ }
223+
224+ udf(f).apply(tileCol).as(s " localAddScalar( $tileCol, $value) " ).as[Tile ]
225+ }
226+
212227 /** Cellwise subtraction between two Tiles. */
213228 def localSubtract (left : Column , right : Column ): TypedColumn [Any , Tile ] =
214229 withAlias(" localSubtract" , left, right)(
215230 udf(F .localSubtract).apply(left, right)
216231 ).as[Tile ]
217232
233+ /** Cellwise subtraction of a scalar from a tile. */
234+ def localSubtractScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
235+ val f = value match {
236+ case i : Int => F .localSubtractScalarInt(_ : Tile , i)
237+ case d : Double => F .localSubtractScalar(_ : Tile , d)
238+ }
239+
240+ udf(f).apply(tileCol).as(s " localSubtractScalar( $tileCol, $value) " ).as[Tile ]
241+ }
242+
218243 /** Cellwise multiplication between two Tiles. */
219244 def localMultiply (left : Column , right : Column ): TypedColumn [Any , Tile ] =
220245 withAlias(" localMultiply" , left, right)(
221246 udf(F .localMultiply).apply(left, right)
222247 ).as[Tile ]
223248
249+ /** Cellwise multiplication of a tile by a scalar. */
250+ def localMultiplyScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
251+ val f = value match {
252+ case i : Int => F .localMultiplyScalarInt(_ : Tile , i)
253+ case d : Double => F .localMultiplyScalar(_ : Tile , d)
254+ }
255+
256+ udf(f).apply(tileCol).as(s " localMultiplyScalar( $tileCol, $value) " ).as[Tile ]
257+ }
258+
224259 /** Cellwise division between two Tiles. */
225260 def localDivide (left : Column , right : Column ): TypedColumn [Any , Tile ] =
226261 withAlias(" localDivide" , left, right)(
227262 udf(F .localDivide).apply(left, right)
228263 ).as[Tile ]
229264
265+ /** Cellwise division of a tile by a scalar. */
266+ def localDivideScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
267+ val f = value match {
268+ case i : Int => F .localDivideScalarInt(_ : Tile , i)
269+ case d : Double => F .localDivideScalar(_ : Tile , d)
270+ }
271+
272+ udf(f).apply(tileCol).as(s " localDivideScalar( $tileCol, $value) " ).as[Tile ]
273+ }
274+
230275 /** Perform an arbitrary GeoTrellis `LocalTileBinaryOp` between two Tile columns. */
231276 def localAlgebra (op : LocalTileBinaryOp , left : Column , right : Column ):
232277 TypedColumn [Any , Tile ] =
233278 withAlias(opName(op), left, right)(
234279 udf[Tile , Tile , Tile ](op.apply).apply(left, right)
235280 ).as[Tile ]
236281
282+ /** Compute the normalized difference of two tile columns */
283+ def normalizedDifference (left : Column , right : Column ): TypedColumn [Any , Tile ] =
284+ withAlias(" normalizedDifference" , left, right)(
285+ udf(F .normalizedDifference).apply(left, right)
286+ ).as[Tile ]
287+
288+ /** Constructor for constant tile column */
289+ def makeConstantTile (value : Number , cols : Int , rows : Int , cellType : String ): TypedColumn [Any , Tile ] =
290+ udf(() => F .makeConstantTile(value, cols, rows, cellType)).apply().as(s " constant_ $cellType" ).as[Tile ]
291+
292+ /** Alias for column of constant tiles of zero */
293+ def tileZeros (cols : Int , rows : Int , cellType : String = " float64" ): TypedColumn [Any , Tile ] =
294+ udf(() => F .tileZeros(cols, rows, cellType)).apply().as(s " zeros_ $cellType" ).as[Tile ]
295+
296+ /** Alias for column of constant tiles of one */
297+ def tileOnes (cols : Int , rows : Int , cellType : String = " float64" ): TypedColumn [Any , Tile ] =
298+ udf(() => F .tileOnes(cols, rows, cellType)).apply().as(s " ones_ $cellType" ).as[Tile ]
299+
300+ /** Where the mask tile contains NODATA, replace values in the source tile with NODATA */
301+ def mask (sourceTile : Column , maskTile : Column ): TypedColumn [Any , Tile ] =
302+ withAlias(" mask" , sourceTile, maskTile)(
303+ udf(F .mask).apply(sourceTile, maskTile)
304+ ).as[Tile ]
305+
306+ /** Where the mask tile equals the mask value, replace values in the source tile with NODATA */
307+ def maskByValue (sourceTile : Column , maskTile : Column , maskValue : Column ): TypedColumn [Any , Tile ] =
308+ withAlias(" maskByValue" , sourceTile, maskTile, maskValue)(
309+ udf(F .maskByValue).apply(sourceTile, maskTile, maskValue)
310+ ).as[Tile ]
311+
312+ /** Where the mask tile DOES NOT contain NODATA, replace values in the source tile with NODATA */
313+ def inverseMask (sourceTile : Column , maskTile : Column ): TypedColumn [Any , Tile ] =
314+ withAlias(" inverseMask" , sourceTile, maskTile)(
315+ udf(F .inverseMask).apply(sourceTile, maskTile)
316+ ).as[Tile ]
317+
318+ /** Create a tile where cells in the grid defined by cols, rows, and bounds are filled with the given value. */
319+ def rasterize (geometry : Column , bounds : Column , value : Column , cols : Int , rows : Int ): TypedColumn [Any , Tile ] =
320+ withAlias(" rasterize" , geometry)(
321+ udf(F .rasterize(_ : Geometry , _ : Geometry , _ : Int , cols, rows)).apply(geometry, bounds, value)
322+ ).as[Tile ]
323+
324+ /** Reproject a column of geometry from one CRS to another. */
325+ def reprojectGeometry (sourceGeom : Column , srcCRS : CRS , dstCRS : CRS ): TypedColumn [Any , Geometry ] =
326+ withAlias(" reprojectGeometry" , sourceGeom)(
327+ udf(F .reprojectGeometry(_ : Geometry , srcCRS, dstCRS)).apply(sourceGeom)
328+ ).as[Geometry ]
329+
237330 /** Render Tile as ASCII string for debugging purposes. */
238331 @ Experimental
239332 def renderAscii (col : Column ): TypedColumn [Any , String ] =
240333 withAlias(" renderAscii" , col)(
241334 udf[String , Tile ](F .renderAscii).apply(col)
242335 ).as[String ]
243336
337+ /** Cellwise less than value comparison between two tiles. */
338+ def localLess (left : Column , right : Column ): TypedColumn [Any , Tile ] =
339+ withAlias(" localLess" , left, right)(
340+ udf(F .localLess).apply(left, right)
341+ ).as[Tile ]
342+
343+
344+ /** Cellwise less than value comparison between a tile and a scalar. */
345+ def localLessScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
346+ val f = value match {
347+ case i : Int ⇒ F .localLessScalarInt(_ : Tile , i)
348+ case d : Double ⇒ F .localLessScalar(_ : Tile , d)
349+ }
350+ udf(f).apply(tileCol).as(s " localLessScalar( $tileCol, $value) " ).as[Tile ]
351+ }
352+
353+ /** Cellwise less than or equal to value comparison between a tile and a scalar. */
354+ def localLessEqual (left : Column , right : Column ): TypedColumn [Any , Tile ] =
355+ withAlias(" localLessEqual" , left, right)(
356+ udf(F .localLess).apply(left, right)
357+ ).as[Tile ]
358+
359+ /** Cellwise less than or equal to value comparison between a tile and a scalar. */
360+ def localLessEqualScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
361+ val f = value match {
362+ case i : Int ⇒ F .localLessEqualScalarInt(_ : Tile , i)
363+ case d : Double ⇒ F .localLessEqualScalar(_ : Tile , d)
364+ }
365+ udf(f).apply(tileCol).as(s " localLessEqualScalar( $tileCol, $value) " ).as[Tile ]
366+ }
367+
368+ /** Cellwise greater than value comparison between two tiles. */
369+ def localGreater (left : Column , right : Column ): TypedColumn [Any , Tile ] =
370+ withAlias(" localGreater" , left, right)(
371+ udf(F .localGreater).apply(left, right)
372+ ).as[Tile ]
373+
374+
375+ /** Cellwise greater than value comparison between a tile and a scalar. */
376+ def localGreaterScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
377+ val f = value match {
378+ case i : Int ⇒ F .localGreaterScalarInt(_ : Tile , i)
379+ case d : Double ⇒ F .localGreaterScalar(_ : Tile , d)
380+ }
381+ udf(f).apply(tileCol).as(s " localGreaterScalar( $tileCol, $value) " ).as[Tile ]
382+ }
383+
384+ /** Cellwise greater than or equal to value comparison between two tiles. */
385+ def localGreaterEqual (left : Column , right : Column ): TypedColumn [Any , Tile ] =
386+ withAlias(" localGreaterEqual" , left, right)(
387+ udf(F .localGreaterEqual).apply(left, right)
388+ ).as[Tile ]
389+
390+ /** Cellwise greater than or equal to value comparison between a tile and a scalar. */
391+ def localGreaterEqualScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
392+ val f = value match {
393+ case i : Int ⇒ F .localGreaterEqualScalarInt(_ : Tile , i)
394+ case d : Double ⇒ F .localGreaterEqualScalar(_ : Tile , d)
395+ }
396+ udf(f).apply(tileCol).as(s " localGreaterEqualScalar( $tileCol, $value) " ).as[Tile ]
397+ }
398+
399+ /** Cellwise equal to value comparison between two tiles. */
400+ def localEqual (left : Column , right : Column ): TypedColumn [Any , Tile ] =
401+ withAlias(" localEqual" , left, right)(
402+ udf(F .localEqual).apply(left, right)
403+ ).as[Tile ]
404+
405+ /** Cellwise equal to value comparison between a tile and a scalar. */
406+ def localEqualScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
407+ val f = value match {
408+ case i : Int ⇒ F .localEqualScalarInt(_ : Tile , i)
409+ case d : Double ⇒ F .localEqualScalar(_ : Tile , d)
410+ }
411+ udf(f).apply(tileCol).as(s " localEqualScalar( $tileCol, $value) " ).as[Tile ]
412+ }
413+ /** Cellwise inequality comparison between two tiles. */
414+ def localUnequal (left : Column , right : Column ): TypedColumn [Any , Tile ] =
415+ withAlias(" localUnequal" , left, right)(
416+ udf(F .localUnequal).apply(left, right)
417+ ).as[Tile ]
418+
419+ /** Cellwise inequality comparison between a tile and a scalar. */
420+ def localUnequalScalar [T : Numeric ](tileCol : Column , value : T ): TypedColumn [Any , Tile ] = {
421+ val f = value match {
422+ case i : Int ⇒ F .localUnequalScalarInt(_ : Tile , i)
423+ case d : Double ⇒ F .localUnequalScalar(_ : Tile , d)
424+ }
425+ udf(f).apply(tileCol).as(s " localUnequalScalar( $tileCol, $value) " ).as[Tile ]
426+ }
244427}
0 commit comments