-
Notifications
You must be signed in to change notification settings - Fork 413
GC: copy local files to spark.local.dir #9739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,6 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} | |
| import org.apache.spark.util.SerializableConfiguration | ||
| import org.slf4j.{Logger, LoggerFactory} | ||
|
|
||
| import java.io.File | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| object LakeFSJobParams { | ||
|
|
@@ -127,6 +126,11 @@ object LakeFSContext { | |
| conf.set(LAKEFS_CONF_JOB_REPO_NAME_KEY, params.repoName) | ||
| conf.setStrings(LAKEFS_CONF_JOB_COMMIT_IDS_KEY, params.commitIDs.toArray: _*) | ||
|
|
||
| val tmpDir = sc.getConf.get("spark.local.dir", null) | ||
| if (tmpDir != null) { | ||
| conf.set("spark.local.dir", tmpDir) | ||
| } | ||
|
|
||
| conf.set(LAKEFS_CONF_JOB_STORAGE_NAMESPACE_KEY, params.storageNamespace) | ||
| if (StringUtils.isBlank(conf.get(LAKEFS_CONF_API_URL_KEY))) { | ||
| throw new InvalidJobConfException(s"$LAKEFS_CONF_API_URL_KEY must not be empty") | ||
|
|
@@ -186,7 +190,7 @@ object LakeFSContext { | |
| ranges.flatMap((range: Range) => { | ||
| val path = new Path(apiClient.getRangeURL(repoName, range.id)) | ||
| val fs = path.getFileSystem(conf) | ||
| val localFile = File.createTempFile("lakefs.", ".range") | ||
| val localFile = StorageUtils.createTempFile(tmpDir, "lakefs.", ".range") | ||
|
||
|
|
||
| fs.copyToLocalFile(false, path, new Path(localFile.getAbsolutePath), true) | ||
| val companion = Entry.messageCompanion | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -63,7 +63,8 @@ object SSTableReader { | |||||
| private def copyToLocal(configuration: Configuration, url: String) = { | ||||||
| val p = new Path(url) | ||||||
| val fs = p.getFileSystem(configuration) | ||||||
| val localFile = File.createTempFile("lakefs.", ".sstable") | ||||||
| val tmpDir = configuration.get("spark.local.dir") | ||||||
|
||||||
| val tmpDir = configuration.get("spark.local.dir") | |
| val tmpDir = null // Use system default temp directory for safety |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,9 @@ import com.amazonaws.services.s3.model.{Region, GetBucketLocationRequest} | |
| import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} | ||
| import com.amazonaws._ | ||
| import org.slf4j.{Logger, LoggerFactory} | ||
| import org.apache.commons.lang3.StringUtils | ||
|
|
||
| import java.io.File | ||
| import java.net.URI | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
|
|
@@ -161,6 +163,20 @@ object StorageUtils { | |
| val GCSMaxBulkSize = | ||
| 500 // 1000 is the max size, 500 is the recommended size to avoid timeouts or hitting HTTP size limits | ||
| } | ||
|
|
||
| /** Create a temporary file in the Spark local directory if configured. | ||
| * This ensures temporary files are stored in executor storage rather than system temp. | ||
| */ | ||
| def createTempFile(sparkLocalDir: String, prefix: String, suffix: String): File = { | ||
| if (StringUtils.isNotBlank(sparkLocalDir)) { | ||
| val dir = new File(sparkLocalDir) | ||
nopcoder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (dir.exists() || dir.mkdirs()) { | ||
| return File.createTempFile(prefix, suffix, dir) | ||
|
Comment on lines
+171
to
+174
|
||
| } | ||
| } | ||
| // Fallback to system temp directory | ||
| File.createTempFile(prefix, suffix) | ||
| } | ||
|
Comment on lines
+170
to
+179
|
||
| } | ||
|
|
||
| class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User-supplied Spark configuration
spark.local.diris propagated into Hadoopconfand later used to create local temp files, allowing arbitrary write into attacker-chosen directories on executors. This crosses a privilege boundary if job submitters can setspark.local.dirand may lead to sensitive file write or symlink abuse. Restrictspark.local.dirto trusted Spark-managed paths or avoid honoring this configuration for file creation; use Spark’s local dir utilities instead.