-
Notifications
You must be signed in to change notification settings - Fork 412
GC: copy local files to spark.local.dir #9739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Downloading metadta to local should prefer spark.local.dir with fallback to temp dir.
arielshaqed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong, but please try to test this. I think you're not reading the configuration value.
Or you might write a test - we have some Spark wrapper for tests that loads an actual Spark into the test. 🤷🏽
| * This ensures temporary files are stored in executor storage rather than system temp. | ||
| */ | ||
| def createTempFile(configuration: Configuration, prefix: String, suffix: String): File = { | ||
| val sparkLocalDir = configuration.get("spark.local.dir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that that configuration lives there! AFAIU it is part of the Spark configuration. And Spark does not load it onto the Hadoop configuration because (IIUC) it's not under "spark.hadoop".
Example:
$ spark-shell --conf spark.local.dir="/tmp/local1/" # This seems to be how to configure this
25/12/02 [...] 14:31:47 WARN Utils: Your hostname, ariels resolves to a loopback address: 127.0.1.1; using 192.168.1.54 instead (on interface wlp0s20f3)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 14:31:49 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
Spark context Web UI available at http://192.168.1.54:4040
Spark context available as 'sc' (master = local[*], app id = local-1764678709842).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.6
/_/
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 21.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.hadoopConfiguration.get("local.dir")
res0: String = null
scala> sc.hadoopConfiguration.get("spark.local.dir")
res1: String = null
scala> sc.getConf.get("spark.local.dir")
res2: String = /tmp/local1/There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not wrong - my very bad implementation.
Fixing and testing before re-request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR modifies the temporary file creation logic for GC operations to prefer Spark's configured local directory (spark.local.dir) over the system temp directory. This ensures temporary files used during metadata downloads are stored in executor storage rather than system temp, which can help with disk management in Spark clusters.
Key changes:
- Added a new
createTempFileutility method inStorageUtilsthat checks forspark.local.dirand falls back to system temp - Updated three file download locations (
SSTableReader,LakeFSInputFormat,LakeFSContext) to use the new utility method - Propagated
spark.local.dirconfiguration from SparkContext to Hadoop Configuration inLakeFSContext
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala |
Added createTempFile utility method to handle temp file creation with spark.local.dir preference |
clients/spark/src/main/scala/io/treeverse/clients/SSTableReader.scala |
Updated to retrieve spark.local.dir from configuration and use new createTempFile method |
clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala |
Updated to retrieve spark.local.dir from configuration and use new createTempFile method; removed unused File import |
clients/spark/src/main/scala/io/treeverse/clients/LakeFSContext.scala |
Propagated spark.local.dir to Hadoop configuration and updated to use new createTempFile method; removed unused File import |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val path = new Path(apiClient.getRangeURL(repoName, range.id)) | ||
| val fs = path.getFileSystem(conf) | ||
| val localFile = File.createTempFile("lakefs.", ".range") | ||
| val localFile = StorageUtils.createTempFile(tmpDir, "lakefs.", ".range") |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable tmpDir is defined on the driver (line 129) but used inside a mapPartitions closure on the executor (line 193). This will cause a serialization issue because the outer closure variable needs to be serialized and sent to executors. Instead, retrieve spark.local.dir from the conf Configuration object (which is already properly serialized via SerializableConfiguration) at line 189, similar to how it's done in the other files.
| def createTempFile(sparkLocalDir: String, prefix: String, suffix: String): File = { | ||
| if (StringUtils.isNotBlank(sparkLocalDir)) { | ||
| val dir = new File(sparkLocalDir) | ||
| if (dir.exists() || dir.mkdirs()) { | ||
| return File.createTempFile(prefix, suffix, dir) | ||
| } | ||
| } | ||
| // Fallback to system temp directory | ||
| File.createTempFile(prefix, suffix) | ||
| } |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new createTempFile method lacks test coverage. Consider adding tests to verify:
- Successful file creation when
spark.local.diris valid - Fallback to system temp directory when
spark.local.diris null/blank - Fallback to system temp directory when directory creation fails
- Proper handling of comma-separated directory paths in
spark.local.dir
clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala
Outdated
Show resolved
Hide resolved
| if (StringUtils.isNotBlank(sparkLocalDir)) { | ||
| val dir = new File(sparkLocalDir) | ||
| if (dir.exists() || dir.mkdirs()) { | ||
| return File.createTempFile(prefix, suffix, dir) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.local.dir from configuration is used directly to construct a File and create a temp file under that directory. If an attacker controls spark.local.dir, they can cause writes of temporary data into arbitrary filesystem locations on executors, potentially exposing sensitive data or abusing symlinked directories. Validate and restrict spark.local.dir to a safe, whitelisted path (e.g., under Spark-managed local directories) and reject absolute paths outside allowed roots; alternatively, ignore configuration-provided paths and use Spark’s LocalDirProvider or a securely resolved executor-local directory.
| val p = new Path(url) | ||
| val fs = p.getFileSystem(configuration) | ||
| val localFile = File.createTempFile("lakefs.", ".sstable") | ||
| val tmpDir = configuration.get("spark.local.dir") |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tmpDir = configuration.get("spark.local.dir") is passed to StorageUtils.createTempFile to select the directory for temporary file creation. If spark.local.dir is attacker-controlled (e.g., via job config), this allows writing temp files to arbitrary directories on the executor, enabling data exfiltration or interaction with sensitive paths via symlinks. Ensure spark.local.dir is validated/normalized to Spark-approved local dirs or ignore user-provided values and use Spark’s managed local directory APIs.
| val tmpDir = configuration.get("spark.local.dir") | |
| val tmpDir = null // Use system default temp directory for safety |
| val tmpDir = sc.getConf.get("spark.local.dir", null) | ||
| if (tmpDir != null) { | ||
| conf.set("spark.local.dir", tmpDir) | ||
| } |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User-supplied Spark configuration spark.local.dir is propagated into Hadoop conf and later used to create local temp files, allowing arbitrary write into attacker-chosen directories on executors. This crosses a privilege boundary if job submitters can set spark.local.dir and may lead to sensitive file write or symlink abuse. Restrict spark.local.dir to trusted Spark-managed paths or avoid honoring this configuration for file creation; use Spark’s local dir utilities instead.
| val tmpDir = sc.getConf.get("spark.local.dir", null) | |
| if (tmpDir != null) { | |
| conf.set("spark.local.dir", tmpDir) | |
| } | |
| // Removed propagation of user-supplied spark.local.dir to Hadoop conf for security reasons. |
Downloading metadta to local should prefer spark.local.dir with fallback to temp dir.
Close #9738