-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Refactor protobuf ser/de traits #19234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
cc @timsaucer |
|
will have a better look later, on the quick look I think it makes sense. One thing i'm not sure is use of generics, it might be better to leave it as &dyn (especially with Tims ffi work). I don't think this is on critical path so it should not impact performance (a lot) Would it be possible to keep it as dyn @adriangb ? |
also with @timsaucer FFI work i think it would make sense to keep it as |
Under the current design it's not, here's an example demonstrating the problem: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=636bc71d4c8c5e8172e41bddef3b1160. Essentially with dynamic dispatch the trait cannot participate in the code flow, it can only handle being a "fallback" as it is now / cannot call back into tree recursion. I haven't found a way around that (other than using generics as I do here) |
|
I think we can keep |
|
#19079 I'd say it requires |
|
would it be possible to split the interface? keep existing one as is and wrap it up with "CorePhysicalCodec" which would have those few methods you need ? |
|
If we need |
I'm not sure I follow, could you elaborate how that would work/what the advantage will be? My thinking is if we're going to keep |
|
I'm on travel all week, but I'll try to take a look as soon as I can. |
|
I'm not sure, you stress me out with hard questions 😀 EDIT: apologise, i've missed things |
milenkovicm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the change will not affect probably the most important methods
pub fn physical_plan_to_bytes_with_extension_codec(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Bytes> {
let protobuf =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
let mut buffer = BytesMut::new();
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
Ok(buffer.into())
}nor
pub fn physical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_physical_plan(ctx, extension_codec)
}also it looks like protobuf::PhysicalPlanNode::try_from_physical_plan does not complain with &dyn PhysicalExtensionCodec,
so removal of dyn may not that big of issue after all. Also FFI may not be affected with this change, but let's wait for Tim to confirm.
just a note, there are now to similar set of methods
fn try_decode_expr(&self, buf: &[u8], inputs: &[Arc<dyn PhysicalExpr>]) -> Result<Arc<dyn PhysicalExpr>> {
self.inner.try_decode_expr(buf, inputs)
}
fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>) -> Result<()> {
self.inner.try_encode_expr(node, buf)
}
fn serialize_physical_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Result<protobuf::PhysicalExprNode> {
to_proto::default_serialize_physical_expr(expr, self)
}
fn deserialize_physical_expr(
&self,
proto: &protobuf::PhysicalExprNode,
ctx: &TaskContext,
input_schema: &arrow::datatypes::Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
from_proto::default_parse_physical_expr(proto, ctx, input_schema, self)
}maybe we need to document when to use which or deprecate.
thanks @adriangb
4be04f9 to
3011921
Compare
|
Would |
|
maybe @lewiszlw would be interested in the change |
|
Thanks for mentioning me @milenkovicm . The refactor looks more clear in first look. |
|
When I was testing this with the FFI work previously I found switching from It feels not ideal to then have two different code structures for the logical and physical codecs. Do you think there's a real performance enhancement to gain by using generics over keeping it with passing |
|
The Maybe as a follow up we should consider value of exposing I need to verify one other bit if it is compatible with the ballista, (which I believe it does), but over all I do believe its solid proposal |
|
Sorry, I should be more clear. We use Sorry for the brevity, trying to touch bases while at a work retreat. |
|
Here's the alternative approach: #19267 |
Relevant issues
Closes #18477
Motivation
The current protobuf serialization/deserialization system in DataFusion has limitations when users need to customize or
intercept the serialization process. Specifically:
Limited interception points: The existing
PhysicalExtensionCodectrait only providestry_encode/try_decodeforcustom extension nodes and
try_encode_expr/try_decode_exprfor custom expressions. There is no way to intercept theserialization/deserialization of every node in a plan tree.
No pre/post-processing hooks: Users cannot run custom logic before or after the default serialization/deserialization
logic. This is critical for use cases like:
PhysicalExprAdapterfactories)fine-grained control over the serialization process that the current trait object-based design doesn't provide.
This was discussed in detail in #18477, where several use cases were identified that would benefit from more flexible
serialization hooks.
Changes Made
This PR refactors the
PhysicalExtensionCodectrait and related serialization infrastructure to enable interception at everynode in the plan tree:
1. Generics Instead of Trait Objects
Changed all codec parameters from
&dyn PhysicalExtensionCodecto generic&C where C: PhysicalExtensionCodec + ?Sized.This is primarily because of limitations on recursive calls with unsized trait objects; happy to hear alternatives though. It should mostly be backawards compatible and may have a performance benefit (I imagine most users only have 1-2 concrete implementations of the codec traits).
Updated traits and functions:
AsExecutionPlan::try_into_physical_planandtry_from_physical_planfrom_proto.rsandto_proto.rs(e.g.,parse_physical_expr,serialize_physical_expr,parse_physical_sort_expr, etc.)2. New Interception Methods on
PhysicalExtensionCodecAdded four new methods with default implementations that route through the codec:
These methods:
default_*functions3. New Public
default_*FunctionsExtracted the actual serialization/deserialization logic into public functions:
default_deserialize_physical_plandefault_serialize_physical_plandefault_parse_physical_expr(renamed fromparse_physical_expr)These functions:
4. Comprehensive Example
Added
adapter_serialization.rsexample demonstrating:PhysicalExprAdapterfactoriesThe example showcases a real-world use case: preserving
FileScanConfig::expr_adapter_factoryacross serializationboundaries, which is not serialized by default but is critical for maintaining filter pushdown behavior.
Public API Breaking Changes
See the "Upgrading Guide" section in
docs/source/library-user-guide/upgrading.mdfor detailed migration instructions.Summary of breaking changes:
1.
AsExecutionPlantrait methods now use genericsBefore:
After:
Who is affected: Anyone implementing
AsExecutionPlanon custom typesMigration: Add generic parameter
<C: PhysicalExtensionCodec + ?Sized>to method signatures2. Helper functions now use generics
Functions like
parse_physical_expr,parse_physical_sort_expr,serialize_physical_expr, etc. now take generic&Cinstead of
&dyn PhysicalExtensionCodec.Who is affected: Code calling these helper functions directly
Migration: Most code will continue to work without changes since
&dyn TraitsatisfiesT: Trait + ?Sized. In rarecases where type inference fails, you may need to add type annotations.
3. New methods on
PhysicalExtensionCodecFour new methods added with default implementations:
deserialize_physical_planserialize_physical_plandeserialize_physical_exprserialize_physical_exprWho is affected: No one for basic usage, since these have default implementations
Opportunity: Users can now override these methods to intercept serialization at every node
Extending PhysicalExtensionCodec vs. new traits
As proposed by @milenkovicm in the weekly meeting when we discussed this I re-used PhysicalExtensionCodec instead of adding new traits. This resulted in considerably less code churn.
Use of AI
Please note that after initial design work a lot of this code was automated by AI which was very nice because there was a lot of mechanical updating of dyn trait to generics.