Skip to content

[DO NOT MERGE]Support Spark Connect#651

Open
chenliu0831 wants to merge 3 commits into
awslabs:masterfrom
chenliu0831:master
Open

[DO NOT MERGE]Support Spark Connect#651
chenliu0831 wants to merge 3 commits into
awslabs:masterfrom
chenliu0831:master

Conversation

@chenliu0831
Copy link
Copy Markdown

Issue #, if available:

See awslabs/python-deequ#254

Description of changes:

Initial effort to evolve PyDeequ to use Spark Connect instead of the currently fragile Py4J based bridge.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.


// The transform method receives protobuf Any from Spark Connect
// Scala compiler sees com.google.protobuf.Any in the interface signature
override def transform(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to ignore

In Spark 4.x the signature was changed from relation: protobuf.Any to relation: Array[Byte]. To avoid pain during the migration I would strongly recommend to keep transform as small as possible and better in a separate class. In GraphFrames we separated implementation of the plugin and the plugin logic to be able to have two versions for different spark. You can see an example here: spark3 and spark4

Otherwise you may need to duplicate the whole logic on a day you will work on support of the spark 4.x

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great call. Thanks, I haven't considered much about Spark 3.x to 4.x breaking change yet (it seems more annoying than I thought..). Let me revisit this in a new revision.

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 926c07a3) — may not be fully accurate. Reply if this doesn't help.

@@ -0,0 +1,504 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is auto-generated by maven-shade-plugin and should not be committed to the repository. Add dependency-reduced-pom.xml to .gitignore.

// Debug: Log what we're receiving
println(s"[DeequPlugin] Received relation with type_url: ${relation.getTypeUrl}")
println(s"[DeequPlugin] Expected type_url for verification: ${DeequVerificationRelation.getDescriptor.getFullName}")
println(s"[DeequPlugin] is(DeequVerificationRelation): ${relation.is(classOf[DeequVerificationRelation])}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debug println statements. These will pollute driver logs in production. Use a proper logger (e.g., org.slf4j.LoggerFactory) or remove them entirely.

println(s"[DeequPlugin] Received relation with type_url: ${relation.getTypeUrl}")
println(s"[DeequPlugin] Expected type_url for verification: ${DeequVerificationRelation.getDescriptor.getFullName}")
println(s"[DeequPlugin] is(DeequVerificationRelation): ${relation.is(classOf[DeequVerificationRelation])}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same: remove println debug logging throughout this method (lines 57-58, 62, 68, 74, 80, 85).

}

/**
* Deserialize the input relation bytes to a DataFrame.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new DataFrame(spark, logicalPlan, ExpressionEncoder(qe.analyzed.schema)) — the 3-arg DataFrame constructor is an internal API that may not exist in all Spark 3.5 builds. Consider using spark.sessionState.executePlan(logicalPlan) and Dataset.ofRows(spark, logicalPlan) instead, which is the standard internal pattern.


// Build Check objects from protobuf messages
val checks = req.getChecksList.asScala.map(CheckBuilder.build).toSeq

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suite is a VerificationRunBuilder, but you're reassigning a var with the return of addCheck and addRequiredAnalyzer. Verify that these methods return the builder (they do return this), but the pattern is fragile — prefer chaining: checks.foldLeft(VerificationSuite().onData(inputDf))((s, c) => s.addCheck(c)).

val relativeError = if (msg.getRelativeError == 0.0) 0.01 else msg.getRelativeError
ApproxQuantile(msg.getColumn, quantile, relativeError)

case "ApproxQuantiles" =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApproxQuantile — defaulting quantile to 0.5 when the proto value is 0.0 means a client cannot explicitly request the 0th quantile. Use msg.hasQuantile() or a wrapper message to distinguish "not set" from "set to 0.0". Same issue with relativeError defaulting when 0.0.

} else {
msg.getColumnsList.asScala.map(_.toDouble).toSeq
}
val relativeError = if (msg.getRelativeError == 0.0) 0.01 else msg.getRelativeError
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApproxQuantiles reuses the columns repeated field to pass quantile values (doubles encoded as strings). This is a semantic mismatch — columns is documented as column names in the proto. Use a dedicated repeated double field in the proto instead.

val spark = planner.sessionHolder.session
val inputDf = deserializeInputRelation(req.getInputRelation, planner)

// Build suggestion runner
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rules.STRING, Rules.NUMERICAL, Rules.COMMON, Rules.EXTENDED — verify these constants exist in the Rules object. The Deequ Rules object only defines DEFAULT. Unknown rule names silently fall back to DEFAULT, which hides configuration errors.

bytes input_relation = 1;

// Checks to run
repeated CheckMessage checks = 2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes input_relation = 1 — using raw bytes to serialize a nested Spark Connect Relation is fragile. If the proto schema changes, deserialization will silently break. Consider using google.protobuf.Any or importing the Spark Connect proto and using the Relation message type directly.

}

// Approx count distinct
case "hasApproxCountDistinct" =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildDoubleAssertion receives a PredicateMessage but protobuf never returns null for message fields — it returns a default instance. The if (pred == null) check will never be true. You need to check c.hasAssertion() at the call site instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants