[DO NOT MERGE]Support Spark Connect#651
Conversation
|
|
||
| // The transform method receives protobuf Any from Spark Connect | ||
| // Scala compiler sees com.google.protobuf.Any in the interface signature | ||
| override def transform( |
There was a problem hiding this comment.
Feel free to ignore
In Spark 4.x the signature was changed from relation: protobuf.Any to relation: Array[Byte]. To avoid pain during the migration I would strongly recommend to keep transform as small as possible and better in a separate class. In GraphFrames we separated implementation of the plugin and the plugin logic to be able to have two versions for different spark. You can see an example here: spark3 and spark4
Otherwise you may need to duplicate the whole logic on a day you will work on support of the spark 4.x
There was a problem hiding this comment.
Great call. Thanks, I haven't considered much about Spark 3.x to 4.x breaking change yet (it seems more annoying than I thought..). Let me revisit this in a new revision.
| @@ -0,0 +1,504 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
There was a problem hiding this comment.
This file is auto-generated by maven-shade-plugin and should not be committed to the repository. Add dependency-reduced-pom.xml to .gitignore.
| // Debug: Log what we're receiving | ||
| println(s"[DeequPlugin] Received relation with type_url: ${relation.getTypeUrl}") | ||
| println(s"[DeequPlugin] Expected type_url for verification: ${DeequVerificationRelation.getDescriptor.getFullName}") | ||
| println(s"[DeequPlugin] is(DeequVerificationRelation): ${relation.is(classOf[DeequVerificationRelation])}") |
There was a problem hiding this comment.
Remove debug println statements. These will pollute driver logs in production. Use a proper logger (e.g., org.slf4j.LoggerFactory) or remove them entirely.
| println(s"[DeequPlugin] Received relation with type_url: ${relation.getTypeUrl}") | ||
| println(s"[DeequPlugin] Expected type_url for verification: ${DeequVerificationRelation.getDescriptor.getFullName}") | ||
| println(s"[DeequPlugin] is(DeequVerificationRelation): ${relation.is(classOf[DeequVerificationRelation])}") | ||
|
|
There was a problem hiding this comment.
Same: remove println debug logging throughout this method (lines 57-58, 62, 68, 74, 80, 85).
| } | ||
|
|
||
| /** | ||
| * Deserialize the input relation bytes to a DataFrame. |
There was a problem hiding this comment.
new DataFrame(spark, logicalPlan, ExpressionEncoder(qe.analyzed.schema)) — the 3-arg DataFrame constructor is an internal API that may not exist in all Spark 3.5 builds. Consider using spark.sessionState.executePlan(logicalPlan) and Dataset.ofRows(spark, logicalPlan) instead, which is the standard internal pattern.
|
|
||
| // Build Check objects from protobuf messages | ||
| val checks = req.getChecksList.asScala.map(CheckBuilder.build).toSeq | ||
|
|
There was a problem hiding this comment.
suite is a VerificationRunBuilder, but you're reassigning a var with the return of addCheck and addRequiredAnalyzer. Verify that these methods return the builder (they do return this), but the pattern is fragile — prefer chaining: checks.foldLeft(VerificationSuite().onData(inputDf))((s, c) => s.addCheck(c)).
| val relativeError = if (msg.getRelativeError == 0.0) 0.01 else msg.getRelativeError | ||
| ApproxQuantile(msg.getColumn, quantile, relativeError) | ||
|
|
||
| case "ApproxQuantiles" => |
There was a problem hiding this comment.
ApproxQuantile — defaulting quantile to 0.5 when the proto value is 0.0 means a client cannot explicitly request the 0th quantile. Use msg.hasQuantile() or a wrapper message to distinguish "not set" from "set to 0.0". Same issue with relativeError defaulting when 0.0.
| } else { | ||
| msg.getColumnsList.asScala.map(_.toDouble).toSeq | ||
| } | ||
| val relativeError = if (msg.getRelativeError == 0.0) 0.01 else msg.getRelativeError |
There was a problem hiding this comment.
ApproxQuantiles reuses the columns repeated field to pass quantile values (doubles encoded as strings). This is a semantic mismatch — columns is documented as column names in the proto. Use a dedicated repeated double field in the proto instead.
| val spark = planner.sessionHolder.session | ||
| val inputDf = deserializeInputRelation(req.getInputRelation, planner) | ||
|
|
||
| // Build suggestion runner |
There was a problem hiding this comment.
Rules.STRING, Rules.NUMERICAL, Rules.COMMON, Rules.EXTENDED — verify these constants exist in the Rules object. The Deequ Rules object only defines DEFAULT. Unknown rule names silently fall back to DEFAULT, which hides configuration errors.
| bytes input_relation = 1; | ||
|
|
||
| // Checks to run | ||
| repeated CheckMessage checks = 2; |
There was a problem hiding this comment.
bytes input_relation = 1 — using raw bytes to serialize a nested Spark Connect Relation is fragile. If the proto schema changes, deserialization will silently break. Consider using google.protobuf.Any or importing the Spark Connect proto and using the Relation message type directly.
| } | ||
|
|
||
| // Approx count distinct | ||
| case "hasApproxCountDistinct" => |
There was a problem hiding this comment.
buildDoubleAssertion receives a PredicateMessage but protobuf never returns null for message fields — it returns a default instance. The if (pred == null) check will never be true. You need to check c.hasAssertion() at the call site instead.
Issue #, if available:
See awslabs/python-deequ#254
Description of changes:
Initial effort to evolve PyDeequ to use Spark Connect instead of the currently fragile Py4J based bridge.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.