Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.util.SerializableConfiguration
import org.slf4j.{Logger, LoggerFactory}

import java.io.File
import java.util.concurrent.TimeUnit

object LakeFSJobParams {
Expand Down Expand Up @@ -186,7 +185,7 @@ object LakeFSContext {
ranges.flatMap((range: Range) => {
val path = new Path(apiClient.getRangeURL(repoName, range.id))
val fs = path.getFileSystem(conf)
val localFile = File.createTempFile("lakefs.", ".range")
val localFile = StorageUtils.createTempFile(conf, "lakefs.", ".range")

fs.copyToLocalFile(false, path, new Path(localFile.getAbsolutePath), true)
val companion = Entry.messageCompanion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import scalapb.GeneratedMessageCompanion

import java.io.DataInput
import java.io.DataOutput
import java.io.File
import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -92,7 +91,7 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]](
var item: Item[Proto] = _
var rangeID: String = ""
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {
localFile = File.createTempFile("lakefs.", ".range")
localFile = StorageUtils.createTempFile(context.getConfiguration, "lakefs.", ".range")
// Cleanup the local file - using the same technic as other data sources:
// https://github.com/apache/spark/blob/c0b1735c0bfeb1ff645d146e262d7ccd036a590e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L123
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => localFile.delete()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object SSTableReader {
private def copyToLocal(configuration: Configuration, url: String) = {
val p = new Path(url)
val fs = p.getFileSystem(configuration)
val localFile = File.createTempFile("lakefs.", ".sstable")
val localFile = StorageUtils.createTempFile(configuration, "lakefs.", ".sstable")
// Cleanup the local file - using the same technic as other data sources:
// https://github.com/apache/spark/blob/c0b1735c0bfeb1ff645d146e262d7ccd036a590e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L123
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => localFile.delete()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.amazonaws.retry.RetryUtils
import com.amazonaws.services.s3.model.{Region, GetBucketLocationRequest}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws._
import org.apache.hadoop.conf.Configuration
import org.slf4j.{Logger, LoggerFactory}

import java.io.File
import java.net.URI
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -161,6 +163,21 @@ object StorageUtils {
val GCSMaxBulkSize =
500 // 1000 is the max size, 500 is the recommended size to avoid timeouts or hitting HTTP size limits
}

/** Create a temporary file in the Spark local directory if configured.
* This ensures temporary files are stored in executor storage rather than system temp.
*/
def createTempFile(configuration: Configuration, prefix: String, suffix: String): File = {
val sparkLocalDir = configuration.get("spark.local.dir")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that that configuration lives there! AFAIU it is part of the Spark configuration. And Spark does not load it onto the Hadoop configuration because (IIUC) it's not under "spark.hadoop".

Example:

$ spark-shell --conf spark.local.dir="/tmp/local1/"    # This seems to be how to configure this

25/12/02 [...] 14:31:47 WARN Utils: Your hostname, ariels resolves to a loopback address: 127.0.1.1; using 192.168.1.54 instead (on interface wlp0s20f3)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 14:31:49 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
Spark context Web UI available at http://192.168.1.54:4040
Spark context available as 'sc' (master = local[*], app id = local-1764678709842).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.6
      /_/
         
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 21.0.9)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc.hadoopConfiguration.get("local.dir")
res0: String = null

scala> sc.hadoopConfiguration.get("spark.local.dir")
res1: String = null

scala> sc.getConf.get("spark.local.dir")
res2: String = /tmp/local1/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not wrong - my very bad implementation.
Fixing and testing before re-request.

if (sparkLocalDir != null && !sparkLocalDir.isEmpty) {
val dir = new File(sparkLocalDir)
if (dir.exists() || dir.mkdirs()) {
return File.createTempFile(prefix, suffix, dir)
}
}
// Fallback to system temp directory
File.createTempFile(prefix, suffix)
}
}

class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition {
Expand Down
Loading