Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ from the other deployment modes. See the [configuration page](configuration.html
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as
we will set the principal to be the job users principal by default.
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private[spark] class HadoopConfBootstrapImpl(
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indention.

.endVolume()
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class HadoopUGIUtil{
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writeTokenStorageToStream calling close on dataStream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handled this below

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Utils.tryWithResource. That will close even if an exception is thrown.

creds.writeTokenStorageToStream(dataStream)
dataStream.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure this is called even if creds.writeTokenStorageToStream(dataStream) throws an exception (unlikely but still worth considering). Not sure what's the best practice to do this in Scala.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

byteStream.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ package object constants {
private[spark] val ENV_SPARK_USER = "SPARK_USER"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH =
"/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY =
Expand All @@ -107,7 +106,7 @@ package object constants {
private[spark] val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"
"spark.kubernetes.hadoop.executor.hadoopConfigMapName"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the same for properties below.


// Kerberos Configuration
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names here are not super clear. Can we rename spark.kubernetes.kerberos.dt to spark.kubernetes.kerberos.delegationTokenSecretName and spark.kubernetes.kerberos.secretname to spark.kubernetes.kerberos.keyTabSecretName, and rename the constants accordingly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also rename the constant val names?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class HadoopKerberosKeytabResolverStep(
maybePrincipal: Option[String],
maybeKeytab: Option[File],
maybeRenewerPrincipal: Option[String],
hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging{
hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging {
private var originalCredentials: Credentials = _
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using var in general

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are Hadoop objects, in java, that are being modified, I believe that I need var.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have one method in this class - can't all of the fields be defined as vals as they are being created?

private var dfs : FileSystem = _
private var renewer: String = _
Expand All @@ -59,7 +59,7 @@ private[spark] class HadoopKerberosKeytabResolverStep(

override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
if (hadoopUGI.isSecurityEnabled) logDebug("Hadoop not configured with Kerberos")
if (!hadoopUGI.isSecurityEnabled) logDebug("Hadoop not configured with Kerberos")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just throw if security is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem here, as was addressed in the previous PR was that throwing an error caused the system to break as there were cases where this step was accessed without kerberos enabled and it threw an exception error that broken working cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this step was accessed without kerberos enabled and it threw an exception error that broken working cases.

Hmm. I wonder if we can check if kerberos is enabled earlier and use this step only when it is so. That could make error handling easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @mccheah referenced. It might be better to check in the orchestrator

val maybeJobUserUGI =
for {
principal <- maybePrincipal
Expand Down