Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Dec 9, 2025

Relevant issues

Closes #18477

Motivation

The current protobuf serialization/deserialization system in DataFusion has limitations when users need to customize or
intercept the serialization process. Specifically:

  1. Limited interception points: The existing PhysicalExtensionCodec trait only provides try_encode/try_decode for
    custom extension nodes and try_encode_expr/try_decode_expr for custom expressions. There is no way to intercept the
    serialization/deserialization of every node in a plan tree.

  2. No pre/post-processing hooks: Users cannot run custom logic before or after the default serialization/deserialization
    logic. This is critical for use cases like:

  • Caching deserialized expressions to avoid redundant work
  • Preserving custom metadata that isn't part of the protobuf schema (e.g., PhysicalExprAdapter factories)
  • Wrapping standard nodes with additional context during serialization
  • Injecting custom state during deserialization
  1. Inflexible for advanced use cases: Distributed query systems, FFI boundaries, and custom execution engines often need
    fine-grained control over the serialization process that the current trait object-based design doesn't provide.

This was discussed in detail in #18477, where several use cases were identified that would benefit from more flexible
serialization hooks.

Changes Made

This PR refactors the PhysicalExtensionCodec trait and related serialization infrastructure to enable interception at every
node in the plan tree:

1. Generics Instead of Trait Objects

Changed all codec parameters from &dyn PhysicalExtensionCodec to generic &C where C: PhysicalExtensionCodec + ?Sized.
This is primarily because of limitations on recursive calls with unsized trait objects; happy to hear alternatives though. It should mostly be backawards compatible and may have a performance benefit (I imagine most users only have 1-2 concrete implementations of the codec traits).

Updated traits and functions:

  • AsExecutionPlan::try_into_physical_plan and try_from_physical_plan
  • All helper functions in from_proto.rs and to_proto.rs (e.g., parse_physical_expr, serialize_physical_expr,
    parse_physical_sort_expr, etc.)

2. New Interception Methods on PhysicalExtensionCodec

Added four new methods with default implementations that route through the codec:

fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: &TaskContext)
   -> Result<Arc<dyn ExecutionPlan>>

fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
   -> Result<PhysicalPlanNode>

fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: &TaskContext,
   input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>>

fn serialize_physical_expr(&self, expr: Arc<dyn PhysicalExpr>)
   -> Result<PhysicalExprNode>

These methods:

  • Have default implementations that call the corresponding default_* functions
  • Enable users to intercept every plan and expression node during serialization/deserialization
  • Support the decorator pattern: run custom logic, call the default implementation, then post-process

3. New Public default_* Functions

Extracted the actual serialization/deserialization logic into public functions:

  • default_deserialize_physical_plan
  • default_serialize_physical_plan
  • default_parse_physical_expr (renamed from parse_physical_expr)

These functions:

  • Contain the core serialization logic
  • Can be called from custom codec implementations
  • Enable the decorator pattern without code duplication

4. Comprehensive Example

Added adapter_serialization.rs example demonstrating:

  • How to intercept serialization to detect plans with custom PhysicalExprAdapter factories
  • Wrapping standard plans as extension nodes with nested JSON metadata
  • Restoring custom state during deserialization
  • Using the decorator pattern with the new interception methods

The example showcases a real-world use case: preserving FileScanConfig::expr_adapter_factory across serialization
boundaries, which is not serialized by default but is critical for maintaining filter pushdown behavior.

Public API Breaking Changes

See the "Upgrading Guide" section in docs/source/library-user-guide/upgrading.md for detailed migration instructions.
Summary of breaking changes:

1. AsExecutionPlan trait methods now use generics

Before:

fn try_into_physical_plan(
   &self,
   ctx: &TaskContext,
   extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>>

After:

fn try_into_physical_plan<C: PhysicalExtensionCodec + ?Sized>(
   &self,
   ctx: &TaskContext,
   extension_codec: &C,
) -> Result<Arc<dyn ExecutionPlan>>

Who is affected: Anyone implementing AsExecutionPlan on custom types

Migration: Add generic parameter <C: PhysicalExtensionCodec + ?Sized> to method signatures

2. Helper functions now use generics

Functions like parse_physical_expr, parse_physical_sort_expr, serialize_physical_expr, etc. now take generic &C
instead of &dyn PhysicalExtensionCodec.

Who is affected: Code calling these helper functions directly

Migration: Most code will continue to work without changes since &dyn Trait satisfies T: Trait + ?Sized. In rare
cases where type inference fails, you may need to add type annotations.

3. New methods on PhysicalExtensionCodec

Four new methods added with default implementations:

  • deserialize_physical_plan
  • serialize_physical_plan
  • deserialize_physical_expr
  • serialize_physical_expr

Who is affected: No one for basic usage, since these have default implementations

Opportunity: Users can now override these methods to intercept serialization at every node

Extending PhysicalExtensionCodec vs. new traits

As proposed by @milenkovicm in the weekly meeting when we discussed this I re-used PhysicalExtensionCodec instead of adding new traits. This resulted in considerably less code churn.

Use of AI

Please note that after initial design work a lot of this code was automated by AI which was very nice because there was a lot of mechanical updating of dyn trait to generics.

@github-actions github-actions bot added documentation Improvements or additions to documentation proto Related to proto crate labels Dec 9, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Dec 9, 2025

cc @timsaucer

@adriangb adriangb marked this pull request as ready for review December 9, 2025 16:30
@milenkovicm
Copy link
Contributor

will have a better look later, on the quick look I think it makes sense. One thing i'm not sure is use of generics, it might be better to leave it as &dyn (especially with Tims ffi work). I don't think this is on critical path so it should not impact performance (a lot)

Would it be possible to keep it as dyn @adriangb ?

@milenkovicm
Copy link
Contributor

also with @timsaucer FFI work i think it would make sense to keep it as dyn

@adriangb adriangb changed the title [WIP]: refactor protobuf ser/de traits Refactor protobuf ser/de traits Dec 9, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Dec 9, 2025

will have a better look later, on the quick look I think it makes sense. One thing i'm not sure is use of generics, it might be better to leave it as &dyn (especially with Tims ffi work). I don't think this is on critical path so it should not impact performance (a lot)

Would it be possible to keep it as dyn @adriangb ?

Under the current design it's not, here's an example demonstrating the problem: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=636bc71d4c8c5e8172e41bddef3b1160.

Essentially with dynamic dispatch the trait cannot participate in the code flow, it can only handle being a "fallback" as it is now / cannot call back into tree recursion. I haven't found a way around that (other than using generics as I do here)

@adriangb
Copy link
Contributor Author

adriangb commented Dec 9, 2025

I think we can keep dyn if we don't provide default implementations. But then this becomes a bit bigger of a breaking change for users since they are forced to implement all of the methods (even if they are just delegating to a default implementation and it's just 1LOC per method).

@milenkovicm
Copy link
Contributor

#19079 I'd say it requires dyn if we want to support FFI implementations

@milenkovicm
Copy link
Contributor

would it be possible to split the interface? keep existing one as is and wrap it up with "CorePhysicalCodec" which would have those few methods you need ?

@adriangb
Copy link
Contributor Author

adriangb commented Dec 9, 2025

If we need dyn for FFI I agree the current approach is going to be non-viable.

@adriangb
Copy link
Contributor Author

adriangb commented Dec 9, 2025

would it be possible to split the interface? keep existing one as is and wrap it up with "CorePhysicalCodec" which would have those few methods you need ?

I'm not sure I follow, could you elaborate how that would work/what the advantage will be?

My thinking is if we're going to keep dyn we should just add the methods and force implementers to delegate to the defaults manually.

@timsaucer
Copy link
Member

I'm on travel all week, but I'll try to take a look as soon as I can.

@milenkovicm
Copy link
Contributor

milenkovicm commented Dec 9, 2025

I'm not sure, you stress me out with hard questions 😀

EDIT: apologise, i've missed things

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the change will not affect probably the most important methods

pub fn physical_plan_to_bytes_with_extension_codec(
    plan: Arc<dyn ExecutionPlan>,
    extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Bytes> {
    let protobuf =
        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
    let mut buffer = BytesMut::new();
    protobuf
        .encode(&mut buffer)
        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
    Ok(buffer.into())
}

nor

pub fn physical_plan_from_bytes_with_extension_codec(
    bytes: &[u8],
    ctx: &TaskContext,
    extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
    protobuf.try_into_physical_plan(ctx, extension_codec)
}

also it looks like protobuf::PhysicalPlanNode::try_from_physical_plan does not complain with &dyn PhysicalExtensionCodec,

so removal of dyn may not that big of issue after all. Also FFI may not be affected with this change, but let's wait for Tim to confirm.

just a note, there are now to similar set of methods

    fn try_decode_expr(&self, buf: &[u8], inputs: &[Arc<dyn PhysicalExpr>]) -> Result<Arc<dyn PhysicalExpr>> {
        self.inner.try_decode_expr(buf, inputs)
    }

    fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>) -> Result<()> {
        self.inner.try_encode_expr(node, buf)
    }

    fn serialize_physical_expr(
        &self,
        expr: &Arc<dyn PhysicalExpr>,
    ) -> Result<protobuf::PhysicalExprNode> {
        to_proto::default_serialize_physical_expr(expr, self)
    }

fn deserialize_physical_expr(
        &self,
        proto: &protobuf::PhysicalExprNode,
        ctx: &TaskContext,
        input_schema: &arrow::datatypes::Schema,
    ) -> Result<Arc<dyn PhysicalExpr>> {
        from_proto::default_parse_physical_expr(proto, ctx, input_schema, self)
    }

maybe we need to document when to use which or deprecate.

thanks @adriangb

@milenkovicm
Copy link
Contributor

Would intercept_[encode|decode]_[plan|expr] make trait name a bit more self explanatory?

@milenkovicm
Copy link
Contributor

maybe @lewiszlw would be interested in the change

@lewiszlw
Copy link
Member

Thanks for mentioning me @milenkovicm . The refactor looks more clear in first look.
Looks like PhysicalExtensionCodec should be called PhysicalPlanCodec.

@timsaucer
Copy link
Member

When I was testing this with the FFI work previously I found switching from &dyn PhysicalExtensionCodec to using generics on the functions was a blocker for the FFI work. However I moved away from using the physical codec in my work and I only use the logical codec. As long as we don't make the same changes in the logical codec, it shouldn't be a blocker for our existing work. I could see a potential for someone who wants to use the FFI_PhysicalExtensionCodec because it doesn't know the concrete type.

It feels not ideal to then have two different code structures for the logical and physical codecs.

Do you think there's a real performance enhancement to gain by using generics over keeping it with passing &dyn? Is it really about how many methods people need to implement?

@milenkovicm
Copy link
Contributor

The PhysicalExtensionCodec remains unchanged except for added methods for which no user action is required. It follows the same approach as the LogicaExtensionCodec. The public-facing methods still accept &dyn PhysicalExtensionCodec. I'm not sure if there will be huge performance implication of the change and if that even matters.

Maybe as a follow up we should consider value of exposing AsExecutionPlan as public interface or we just keep from|to_bytes methos

I need to verify one other bit if it is compatible with the ballista, (which I believe it does), but over all I do believe its solid proposal

@timsaucer
Copy link
Member

Sorry, I should be more clear. We use serialize_physical_sort_exprs which looks like it is included in the change here. It isn't a problem in the current main because that only has DefaultPhysicalExtensionCodec. If we make similar changes on the logical side as are proposed here or the physical side it will be a problem since we do not have concrete types as soon as we switch over to our FFI_LogicalExtensionCodec.

Sorry for the brevity, trying to touch bases while at a work retreat.

@adriangb
Copy link
Contributor Author

Here's the alternative approach: #19267

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redesign protobuf encode/decode hooks & state

4 participants