Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.util.SerializableConfiguration
import org.slf4j.{Logger, LoggerFactory}

import java.io.File
import java.util.concurrent.TimeUnit

object LakeFSJobParams {
Expand Down Expand Up @@ -127,6 +126,11 @@ object LakeFSContext {
conf.set(LAKEFS_CONF_JOB_REPO_NAME_KEY, params.repoName)
conf.setStrings(LAKEFS_CONF_JOB_COMMIT_IDS_KEY, params.commitIDs.toArray: _*)

val tmpDir = sc.getConf.get("spark.local.dir", null)
if (tmpDir != null) {
conf.set("spark.local.dir", tmpDir)
}
Comment on lines +129 to +132
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User-supplied Spark configuration spark.local.dir is propagated into Hadoop conf and later used to create local temp files, allowing arbitrary write into attacker-chosen directories on executors. This crosses a privilege boundary if job submitters can set spark.local.dir and may lead to sensitive file write or symlink abuse. Restrict spark.local.dir to trusted Spark-managed paths or avoid honoring this configuration for file creation; use Spark’s local dir utilities instead.

Suggested change
val tmpDir = sc.getConf.get("spark.local.dir", null)
if (tmpDir != null) {
conf.set("spark.local.dir", tmpDir)
}
// Removed propagation of user-supplied spark.local.dir to Hadoop conf for security reasons.

Copilot uses AI. Check for mistakes.

conf.set(LAKEFS_CONF_JOB_STORAGE_NAMESPACE_KEY, params.storageNamespace)
if (StringUtils.isBlank(conf.get(LAKEFS_CONF_API_URL_KEY))) {
throw new InvalidJobConfException(s"$LAKEFS_CONF_API_URL_KEY must not be empty")
Expand Down Expand Up @@ -186,7 +190,7 @@ object LakeFSContext {
ranges.flatMap((range: Range) => {
val path = new Path(apiClient.getRangeURL(repoName, range.id))
val fs = path.getFileSystem(conf)
val localFile = File.createTempFile("lakefs.", ".range")
val localFile = StorageUtils.createTempFile(tmpDir, "lakefs.", ".range")
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable tmpDir is defined on the driver (line 129) but used inside a mapPartitions closure on the executor (line 193). This will cause a serialization issue because the outer closure variable needs to be serialized and sent to executors. Instead, retrieve spark.local.dir from the conf Configuration object (which is already properly serialized via SerializableConfiguration) at line 189, similar to how it's done in the other files.

Copilot uses AI. Check for mistakes.

fs.copyToLocalFile(false, path, new Path(localFile.getAbsolutePath), true)
val companion = Entry.messageCompanion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import scalapb.GeneratedMessageCompanion

import java.io.DataInput
import java.io.DataOutput
import java.io.File
import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -92,7 +91,8 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]](
var item: Item[Proto] = _
var rangeID: String = ""
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {
localFile = File.createTempFile("lakefs.", ".range")
val tmpDir = context.getConfiguration.get("spark.local.dir")
localFile = StorageUtils.createTempFile(tmpDir, "lakefs.", ".range")
// Cleanup the local file - using the same technic as other data sources:
// https://github.com/apache/spark/blob/c0b1735c0bfeb1ff645d146e262d7ccd036a590e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L123
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => localFile.delete()))
Expand Down Expand Up @@ -153,6 +153,7 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]](
object LakeFSInputFormat {
val DummyFileName = "dummy"
val logger: Logger = LoggerFactory.getLogger(getClass.toString)

def read[Proto <: GeneratedMessage with scalapb.Message[Proto]](
reader: SSTableReader[Proto]
): Seq[Item[Proto]] = reader.newIterator().toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ object SSTableReader {
private def copyToLocal(configuration: Configuration, url: String) = {
val p = new Path(url)
val fs = p.getFileSystem(configuration)
val localFile = File.createTempFile("lakefs.", ".sstable")
val tmpDir = configuration.get("spark.local.dir")
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpDir = configuration.get("spark.local.dir") is passed to StorageUtils.createTempFile to select the directory for temporary file creation. If spark.local.dir is attacker-controlled (e.g., via job config), this allows writing temp files to arbitrary directories on the executor, enabling data exfiltration or interaction with sensitive paths via symlinks. Ensure spark.local.dir is validated/normalized to Spark-approved local dirs or ignore user-provided values and use Spark’s managed local directory APIs.

Suggested change
val tmpDir = configuration.get("spark.local.dir")
val tmpDir = null // Use system default temp directory for safety

Copilot uses AI. Check for mistakes.
val localFile = StorageUtils.createTempFile(tmpDir, "lakefs.", ".sstable")
// Cleanup the local file - using the same technic as other data sources:
// https://github.com/apache/spark/blob/c0b1735c0bfeb1ff645d146e262d7ccd036a590e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L123
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => localFile.delete()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import com.amazonaws.services.s3.model.{Region, GetBucketLocationRequest}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws._
import org.slf4j.{Logger, LoggerFactory}
import org.apache.commons.lang3.StringUtils

import java.io.File
import java.net.URI
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -161,6 +163,20 @@ object StorageUtils {
val GCSMaxBulkSize =
500 // 1000 is the max size, 500 is the recommended size to avoid timeouts or hitting HTTP size limits
}

/** Create a temporary file in the Spark local directory if configured.
* This ensures temporary files are stored in executor storage rather than system temp.
*/
def createTempFile(sparkLocalDir: String, prefix: String, suffix: String): File = {
if (StringUtils.isNotBlank(sparkLocalDir)) {
val dir = new File(sparkLocalDir)
if (dir.exists() || dir.mkdirs()) {
return File.createTempFile(prefix, suffix, dir)
Comment on lines +171 to +174
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.local.dir from configuration is used directly to construct a File and create a temp file under that directory. If an attacker controls spark.local.dir, they can cause writes of temporary data into arbitrary filesystem locations on executors, potentially exposing sensitive data or abusing symlinked directories. Validate and restrict spark.local.dir to a safe, whitelisted path (e.g., under Spark-managed local directories) and reject absolute paths outside allowed roots; alternatively, ignore configuration-provided paths and use Spark’s LocalDirProvider or a securely resolved executor-local directory.

Copilot uses AI. Check for mistakes.
}
}
// Fallback to system temp directory
File.createTempFile(prefix, suffix)
}
Comment on lines +170 to +179
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new createTempFile method lacks test coverage. Consider adding tests to verify:

  1. Successful file creation when spark.local.dir is valid
  2. Fallback to system temp directory when spark.local.dir is null/blank
  3. Fallback to system temp directory when directory creation fails
  4. Proper handling of comma-separated directory paths in spark.local.dir

Copilot uses AI. Check for mistakes.
}

class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition {
Expand Down
Loading