Skip to content

Commit ef8b15f

Browse files
[#553] Implement vacuuming and compaction on delta parallel writer
# New features and improvements Parallel write with delta now takes 4 more (optional) configuration keys: * compactFrequency: Int * compactNumFile: Int * vacuumFrequency: Int * retentionHours: Int The behaviour is the following: When performing the write operation of the micro-batch, every $compactFrequency micro-batches (i.e. if the batch id % $compactFrequency == 0), the whole table is rewritten repartitioning it in $compactNumFile partitions (i.e. output files). When performing the write operation of the micro-batch, every $vacuumFrequency micro-batches (i.e. if the batch id % $retentionFrequency == 0), the table is vacuumed passing $retentionHours as retentionHours parameter of the [vacuum function](https://docs.delta.io/latest/delta-utility.html#-delta-vacuum). $compactFrequency and $compactNumFile must be both set or neither set, otherwise a configuration error will be thrown. Not setting them will simply disable the feature (i.e. no compaction is performed). $vacuumFrequency and $retentionHours must be both set or neither set, otherwise a configuration error will be thrown. Not setting them will simply disable the feature (i.e. no vacuum is performed). # Breaking changes None. # Migration None. # Bug fixes None. # How this feature was tested Existing unit tests. # Related issue Closes #553
1 parent ddd5625 commit ef8b15f

File tree

12 files changed

+122
-48
lines changed

12 files changed

+122
-48
lines changed

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/ParallelWriteWriters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class ParallelWriteSparkStructuredStreamingWriter(
4646
)
4747
logger.info(s"Writing microbatch with id: $batchId")
4848
try
49-
writer.write(writeExecutionPlan, batch, correlationId)
49+
writer.write(writeExecutionPlan, batch, correlationId, batchId)
5050
catch {
5151
case e: Exception =>
5252
logger.error("Failed writing a microbatch", e)

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/model/ParallelWriteModelParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import spray.json._
1111
object ParallelWriteModelParser {
1212
implicit lazy val parallelWriteFormat: RootJsonFormat[ParallelWrite] = jsonFormat1((saveMode: String) => ParallelWrite.apply(saveMode))
1313
implicit lazy val catalogCoordinatesFormat: RootJsonFormat[CatalogCoordinates] = jsonFormat5(CatalogCoordinates.apply)
14-
implicit lazy val continuousUpdateFormat: RootJsonFormat[ContinuousUpdate] = jsonFormat2((keys: List[String], orderingExpression: String) => ContinuousUpdate.apply(keys, orderingExpression))
14+
implicit lazy val continuousUpdateFormat: RootJsonFormat[ContinuousUpdate] = jsonFormat6(ContinuousUpdate)
1515

1616
implicit lazy val writerDetailsFormat: RootJsonFormat[WriterDetails] = new RootJsonFormat[WriterDetails] {
1717
override def read(json: JsValue): WriterDetails =

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/model/WriterDetails.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,26 @@ package it.agilelab.bigdata.wasp.consumers.spark.plugins.parallel.model
22

33
sealed trait WriterDetails
44
object WriterDetails {
5-
val parallelWrite = "parallelWrite"
5+
val parallelWrite = "parallelWrite"
66
val continuousUpdate = "continuousUpdate"
77
}
88

99
/**
10-
* Details needeed by parallel writer
11-
* @param saveMode spark save mode
12-
*/
10+
* Details needeed by parallel writer
11+
* @param saveMode spark save mode
12+
*/
1313
case class ParallelWrite(saveMode: String) extends WriterDetails
1414

1515
/**
16-
* Details needed by continuous update writer
17-
* @param keys delta table unique keys column list
18-
* @param orderingExpression monotonically increasing select expression to choose upsert candidate
19-
*/
20-
case class ContinuousUpdate(keys: List[String], orderingExpression: String) extends WriterDetails
21-
16+
* Details needed by continuous update writer
17+
* @param keys delta table unique keys column list
18+
* @param orderingExpression monotonically increasing select expression to choose upsert candidate
19+
*/
20+
case class ContinuousUpdate(
21+
keys: List[String],
22+
orderingExpression: String,
23+
compactFrequency: Option[Int] = None,
24+
compactNumFile: Option[Int] = None,
25+
retentionHours: Option[Int] = None,
26+
vacuumFrequency: Option[Int] = None
27+
) extends WriterDetails

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/ColdParallelWriter.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,20 @@ trait ColdParallelWriter extends ParallelWriter {
1414
final override def write(
1515
writeExecutionPlan: WriteExecutionPlanResponseBody,
1616
df: DataFrame,
17-
correlationId: CorrelationId
17+
correlationId: CorrelationId,
18+
batchId: Long
1819
): Unit = {
1920
val s3path: URI = HadoopS3Utils.useS3aScheme(
2021
new URI(writeExecutionPlan.writeUri.getOrElse(
2122
throw new RuntimeException("Entity responded without a writeUri field for a COLD case write"))))
2223
val spark = df.sparkSession
2324
credentialsConfigurator.configureCredentials(writeExecutionPlan, spark.sparkContext.hadoopConfiguration)
2425
val partitioningColumns: Seq[String] = catalogService.getPartitioningColumns(spark, entityDetails)
25-
performColdWrite(df, s3path, partitioningColumns)
26+
performColdWrite(df, s3path, partitioningColumns, batchId)
2627
recoverPartitions(spark, partitioningColumns)
2728
}
2829

29-
protected def performColdWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String]): Unit
30+
protected def performColdWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String], batchId: Long): Unit
3031

3132
private def recoverPartitions(sparkSession: SparkSession, partitions: Seq[String]): Unit =
3233
if (partitions.nonEmpty)

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/ContinuousUpdateWriter.scala

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,56 @@ import java.net.URI
1313
class SchemaException(message: String) extends Exception(message)
1414

1515
/**
16-
* Writer for continuous update.
17-
* @param writerDetails Informations about unique keys, ordering expression and fields to drop
18-
* @param entityDetails
19-
*/
16+
* Writer for continuous update.
17+
* @param writerDetails Informations about unique keys, ordering expression and fields to drop
18+
* @param entityDetails
19+
*/
2020
case class ContinuousUpdateWriter(
21-
writerDetails: ContinuousUpdate,
22-
entityAPI: ParallelWriteEntity,
23-
entityDetails: CatalogCoordinates,
24-
catalogService: DataCatalogService
21+
writerDetails: ContinuousUpdate,
22+
entityAPI: ParallelWriteEntity,
23+
entityDetails: CatalogCoordinates,
24+
catalogService: DataCatalogService
2525
) extends DeltaParallelWriterTrait {
2626

27-
override def performDeltaWrite(df: DataFrame, s3path: URI, partitioningColumns: Seq[String]): Unit = {
27+
override def performDeltaWrite(df: DataFrame, s3path: URI, partitioningColumns: Seq[String], batchId: Long): Unit = {
2828
// schema evolution not supported yet, property not necessary at the moment
2929
// ss.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
3030
val spark: SparkSession = df.sparkSession
3131
val orderedDF = applyOrderingLogic(df, writerDetails.keys, writerDetails.orderingExpression)
3232
val enforcedDf = enforceSchema(orderedDF)
3333
val condition = writerDetails.keys.map(x => s"table.$x = table2.$x").mkString(" AND ")
3434
val deltaTable = getDeltaTable(s3path, spark, partitioningColumns)
35+
(writerDetails.compactFrequency, writerDetails.compactNumFile) match {
36+
case (None, None) =>
37+
case (Some(compactFrequency), Some(compactNumFile)) =>
38+
if (batchId % compactFrequency == 0) {
39+
logger.info(s"Compacting table at ${s3path} with partitions $compactNumFile files")
40+
deltaTable.toDF
41+
.repartition(compactNumFile)
42+
.write
43+
.option("dataChange", "false")
44+
.format("delta")
45+
.mode("overwrite")
46+
.partitionBy(partitioningColumns: _*)
47+
.save(s3path.toString)
48+
}
49+
case other =>
50+
throw new IllegalArgumentException(
51+
s"Both compactFrequency and compactNumFile must be null or have a value, but ${other} was provided"
52+
)
53+
}
54+
(writerDetails.retentionHours, writerDetails.vacuumFrequency) match {
55+
case (None, None) =>
56+
case (Some(retentionHours), Some(vacuumFrequency)) =>
57+
if (batchId % vacuumFrequency == 0) {
58+
logger.info(s"Vacuuming table ${s3path} with retention ${retentionHours} hours")
59+
deltaTable.vacuum(retentionHours.toDouble)
60+
}
61+
case other =>
62+
throw new IllegalArgumentException(
63+
s"Both retentionHours and vacuumFrequency must be null or have a value, but ${other} was provided"
64+
)
65+
}
3566
deltaTable
3667
.as("table")
3768
.merge(enforcedDf.as("table2"), condition)

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/DeltaParallelWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ case class DeltaParallelWriter(
1515
override val catalogService: DataCatalogService
1616
) extends DeltaParallelWriterTrait {
1717

18-
override def performDeltaWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String]): Unit =
18+
override def performDeltaWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String], batchId: Long): Unit =
1919
enforceSchema(df).write
2020
.mode(parallelWriteDetails.saveMode)
2121
.format("delta")

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/DeltaParallelWriterTrait.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import java.net.URI
88

99
trait DeltaParallelWriterTrait extends ColdParallelWriter {
1010

11-
override final def performColdWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String]): Unit = {
12-
performDeltaWrite(df, path, partitioningColumns)
11+
override final def performColdWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String], batchId: Long): Unit = {
12+
performDeltaWrite(df, path, partitioningColumns, batchId)
1313
reconciliateManifest(getDeltaTable(path, df.sparkSession, partitioningColumns))
1414
}
1515

16-
protected def performDeltaWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String]): Unit
16+
protected def performDeltaWrite(df: DataFrame, path: URI, partitioningColumns: Seq[String], batchId: Long): Unit
1717

1818
private def reconciliateManifest(deltaTable: DeltaTable): Unit =
1919
deltaTable.generate("symlink_format_manifest")

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/HotParallelWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ case class HotParallelWriter(
2323
override def write(
2424
writeExecutionPlan: WriteExecutionPlanResponseBody,
2525
df: DataFrame,
26-
correlationId: CorrelationId
26+
correlationId: CorrelationId,
27+
batchId: Long
2728
): Unit = {
2829
logger.info(s"Writing to entity ${entityDetails.name}")
2930
df.select(to_json(struct(df.columns.map(col): _*))).foreachPartition { it: Iterator[Row] =>

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/ParallelWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ trait ParallelWriter extends Logging{
2525
* @param writeExecutionPlan execution plan obtained from entity
2626
* @param df data to write
2727
*/
28-
def write(writeExecutionPlan: WriteExecutionPlanResponseBody, df: DataFrame, correlationId: CorrelationId): Unit
28+
def write(writeExecutionPlan: WriteExecutionPlanResponseBody, df: DataFrame, correlationId: CorrelationId, batchId: Long): Unit
2929

3030
def rollback(correlationId: CorrelationId): Unit =
3131
entityAPI.postDataComplete(DataCompleteRequestBody(false), correlationId)

plugin-parallel-write-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/plugins/parallel/writers/ParquetParallelWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ case class ParquetParallelWriter(
1515
catalogService: DataCatalogService
1616
) extends ColdParallelWriter {
1717

18-
override protected def performColdWrite(df: DataFrame, s3path: URI, partitioningColumns: Seq[String]): Unit =
18+
override protected def performColdWrite(df: DataFrame, s3path: URI, partitioningColumns: Seq[String], batchId: Long): Unit =
1919
enforceSchema(df).write
2020
.mode(parallelWriteDetails.saveMode)
2121
.format("parquet")

0 commit comments

Comments
 (0)