diff --git a/Cargo.lock b/Cargo.lock index 02b88e99fd1db..5232e61b2a859 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,7 @@ dependencies = [ "object_store", "prost", "rand 0.9.2", + "serde", "serde_json", "strum 0.27.2", "strum_macros 0.27.2", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b0190dadf3c3f..cdc45dea1a757 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -59,6 +59,7 @@ mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } prost = { workspace = true } rand = { workspace = true } +serde = { version = "1", features = ["derive"] } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs new file mode 100644 index 0000000000000..24e3e2f210878 --- /dev/null +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! See `main.rs` for how to run it. +//! +//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's +//! interception methods (`serialize_physical_plan` and `deserialize_physical_plan`) +//! to implement custom serialization logic. +//! +//! The key insight is that `FileScanConfig::expr_adapter_factory` is NOT serialized by +//! default. This example shows how to: +//! 1. Detect plans with custom adapters during serialization +//! 2. Wrap them as Extension nodes with JSON-serialized adapter metadata +//! 3. Unwrap and restore the adapter during deserialization +//! +//! This demonstrates nested serialization (protobuf outer, JSON inner) and the power +//! of the `PhysicalExtensionCodec` interception pattern. Both plan and expression +//! serialization route through the codec, enabling interception at every node in the tree. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow::array::record_batch; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::assert_batches_eq; +use datafusion::common::Result; +use datafusion::common::not_impl_err; +use datafusion::datasource::listing::{ + ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl, +}; +use datafusion::datasource::physical_plan::{FileScanConfig, FileScanConfigBuilder}; +use datafusion::datasource::source::DataSourceExec; +use datafusion::execution::TaskContext; +use datafusion::execution::context::SessionContext; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionConfig; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; +use datafusion_proto::bytes::{ + physical_plan_from_bytes_with_extension_codec, + physical_plan_to_bytes_with_extension_codec, +}; +use datafusion_proto::physical_plan::{ + PhysicalExtensionCodec, default_deserialize_physical_expr, + default_deserialize_physical_plan, default_serialize_physical_expr, + default_serialize_physical_plan, +}; +use datafusion_proto::protobuf::{ + PhysicalExtensionNode, PhysicalPlanNode, physical_plan_node::PhysicalPlanType, +}; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use prost::Message; +use serde::{Deserialize, Serialize}; + +/// Example showing how to preserve custom adapter information during plan serialization. +/// +/// This demonstrates: +/// 1. Creating a custom PhysicalExprAdapter with metadata +/// 2. Using PhysicalExtensionCodec to intercept serialization +/// 3. Wrapping adapter info as Extension nodes +/// 4. Restoring adapters during deserialization +pub async fn adapter_serialization() -> Result<()> { + println!("=== PhysicalExprAdapter Serialization Example ===\n"); + + // Step 1: Create sample Parquet data in memory + println!("Step 1: Creating sample Parquet data..."); + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))?; + let path = Path::from("data.parquet"); + write_parquet(&store, &path, &batch).await?; + + // Step 2: Set up session with custom adapter + println!("Step 2: Setting up session with custom adapter..."); + let logical_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); + ctx.runtime_env().register_object_store( + ObjectStoreUrl::parse("memory://")?.as_ref(), + Arc::clone(&store), + ); + + // Create a table with our custom MetadataAdapterFactory + let adapter_factory = Arc::new(MetadataAdapterFactory::new("v1")); + let listing_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///data.parquet")?) + .infer_options(&ctx.state()) + .await? + .with_schema(logical_schema) + .with_expr_adapter_factory( + Arc::clone(&adapter_factory) as Arc + ); + let table = ListingTable::try_new(listing_config)?; + ctx.register_table("my_table", Arc::new(table))?; + + // Step 3: Create physical plan with filter + println!("Step 3: Creating physical plan with filter..."); + let df = ctx.sql("SELECT * FROM my_table WHERE id > 5").await?; + let original_plan = df.create_physical_plan().await?; + + // Verify adapter is present in original plan + let has_adapter_before = verify_adapter_in_plan(&original_plan, "original"); + println!(" Original plan has adapter: {has_adapter_before}"); + + // Step 4: Serialize with our custom codec + println!("\nStep 4: Serializing plan with AdapterPreservingCodec..."); + let codec = AdapterPreservingCodec; + let bytes = + physical_plan_to_bytes_with_extension_codec(Arc::clone(&original_plan), &codec)?; + println!(" Serialized {} bytes", bytes.len()); + println!(" (DataSourceExec with adapter was wrapped as PhysicalExtensionNode)"); + + // Step 5: Deserialize with our custom codec + println!("\nStep 5: Deserializing plan with AdapterPreservingCodec..."); + let task_ctx = ctx.task_ctx(); + let restored_plan = + physical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx, &codec)?; + + // Verify adapter is restored + let has_adapter_after = verify_adapter_in_plan(&restored_plan, "restored"); + println!(" Restored plan has adapter: {has_adapter_after}"); + + // Step 6: Execute and compare results + println!("\nStep 6: Executing plans and comparing results..."); + let original_results = + datafusion::physical_plan::collect(Arc::clone(&original_plan), task_ctx.clone()) + .await?; + let restored_results = + datafusion::physical_plan::collect(restored_plan, task_ctx).await?; + + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 6 |", + "| 7 |", + "| 8 |", + "| 9 |", + "| 10 |", + "+----+", + ]; + + println!("\n Original plan results:"); + arrow::util::pretty::print_batches(&original_results)?; + assert_batches_eq!(expected, &original_results); + + println!("\n Restored plan results:"); + arrow::util::pretty::print_batches(&restored_results)?; + assert_batches_eq!(expected, &restored_results); + + println!("\n=== Example Complete! ==="); + println!("Key takeaways:"); + println!( + " 1. PhysicalExtensionCodec provides serialize_physical_plan/deserialize_physical_plan hooks" + ); + println!(" 2. Custom metadata can be wrapped as PhysicalExtensionNode"); + println!(" 3. Nested serialization (protobuf + JSON) works seamlessly"); + println!( + " 4. Both plans produce identical results despite serialization round-trip" + ); + println!(" 5. Adapters are fully preserved through the serialization round-trip"); + + Ok(()) +} + +// ============================================================================ +// MetadataAdapter - A simple custom adapter with a tag +// ============================================================================ + +/// A custom PhysicalExprAdapter that wraps another adapter. +/// The tag metadata is stored in the factory, not the adapter itself. +#[derive(Debug)] +struct MetadataAdapter { + inner: Arc, +} + +impl PhysicalExprAdapter for MetadataAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + // Simply delegate to inner adapter + self.inner.rewrite(expr) + } +} + +// ============================================================================ +// MetadataAdapterFactory - Factory for creating MetadataAdapter instances +// ============================================================================ + +/// Factory for creating MetadataAdapter instances. +/// The tag is stored in the factory and extracted via Debug formatting in `extract_adapter_tag`. +#[derive(Debug)] +struct MetadataAdapterFactory { + // Note: This field is read via Debug formatting in `extract_adapter_tag`. + // Rust's dead code analysis doesn't recognize Debug-based field access. + // In PR #19234, this field is used by `with_partition_values`, but that method + // doesn't exist in upstream DataFusion's PhysicalExprAdapter trait. + #[allow(dead_code)] + tag: String, +} + +impl MetadataAdapterFactory { + fn new(tag: impl Into) -> Self { + Self { tag: tag.into() } + } +} + +impl PhysicalExprAdapterFactory for MetadataAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + let inner = DefaultPhysicalExprAdapterFactory + .create(logical_file_schema, physical_file_schema); + Arc::new(MetadataAdapter { inner }) + } +} + +// ============================================================================ +// AdapterPreservingCodec - Custom codec that preserves adapters +// ============================================================================ + +/// Extension payload structure for serializing adapter info +#[derive(Serialize, Deserialize)] +struct ExtensionPayload { + /// Marker to identify this is our custom extension + marker: String, + /// JSON-serialized adapter metadata + adapter_metadata: AdapterMetadata, + /// Protobuf-serialized inner DataSourceExec (without adapter) + inner_plan_bytes: Vec, +} + +/// Metadata about the adapter to recreate it during deserialization +#[derive(Serialize, Deserialize)] +struct AdapterMetadata { + /// The adapter tag (e.g., "v1") + tag: String, +} + +const EXTENSION_MARKER: &str = "adapter_preserving_extension_v1"; + +/// A codec that intercepts serialization to preserve adapter information. +#[derive(Debug)] +struct AdapterPreservingCodec; + +impl PhysicalExtensionCodec for AdapterPreservingCodec { + // Required method: decode custom extension nodes + fn try_decode( + &self, + buf: &[u8], + _inputs: &[Arc], + ctx: &TaskContext, + ) -> Result> { + // Try to parse as our extension payload + if let Ok(payload) = serde_json::from_slice::(buf) + && payload.marker == EXTENSION_MARKER + { + // Decode the inner plan + let inner_proto = PhysicalPlanNode::decode(&payload.inner_plan_bytes[..]) + .map_err(|e| { + datafusion::error::DataFusionError::Plan(format!( + "Failed to decode inner plan: {e}" + )) + })?; + + // Deserialize the inner plan using default implementation + let inner_plan = default_deserialize_physical_plan(&inner_proto, ctx, self)?; + + // Recreate the adapter factory + let adapter_factory = create_adapter_factory(&payload.adapter_metadata.tag); + + // Inject adapter into the plan + return inject_adapter_into_plan(inner_plan, adapter_factory); + } + + not_impl_err!("Unknown extension type") + } + + // Required method: encode custom execution plans + fn try_encode( + &self, + _node: Arc, + _buf: &mut Vec, + ) -> Result<()> { + // We don't need this for the example - we use serialize_physical_plan instead + not_impl_err!( + "try_encode not used - adapter wrapping happens in serialize_physical_plan" + ) + } + + // Interception point: override serialization to wrap adapters + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + // Check if this is a DataSourceExec with adapter + if let Some(exec) = plan.as_any().downcast_ref::() + && let Some(config) = + exec.data_source().as_any().downcast_ref::() + && let Some(adapter_factory) = &config.expr_adapter_factory + && let Some(tag) = extract_adapter_tag(adapter_factory.as_ref()) + { + // Try to extract our MetadataAdapterFactory's tag + println!(" [Serialize] Found DataSourceExec with adapter tag: {tag}"); + + // 1. Create adapter metadata + let adapter_metadata = AdapterMetadata { tag }; + + // 2. Create a copy of the config without the adapter + let config_without_adapter = rebuild_config_without_adapter(config); + + // 3. Create a new DataSourceExec without adapter + let plan_without_adapter: Arc = + DataSourceExec::from_data_source(config_without_adapter); + + // 4. Serialize the inner plan to protobuf bytes + let inner_proto = + default_serialize_physical_plan(plan_without_adapter, self)?; + let mut inner_bytes = Vec::new(); + inner_proto.encode(&mut inner_bytes).map_err(|e| { + datafusion::error::DataFusionError::Plan(format!( + "Failed to encode inner plan: {e}" + )) + })?; + + // 5. Create extension payload + let payload = ExtensionPayload { + marker: EXTENSION_MARKER.to_string(), + adapter_metadata, + inner_plan_bytes: inner_bytes, + }; + let payload_bytes = serde_json::to_vec(&payload).map_err(|e| { + datafusion::error::DataFusionError::Plan(format!( + "Failed to serialize payload: {e}" + )) + })?; + + // 6. Return as PhysicalExtensionNode + return Ok(PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Extension( + PhysicalExtensionNode { + node: payload_bytes, + inputs: vec![], // Leaf node + }, + )), + }); + } + + // No adapter found - use default serialization + default_serialize_physical_plan(plan, self) + } + + // Interception point: override deserialization to unwrap adapters + fn deserialize_physical_plan( + &self, + proto: &PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + // Check if this is our custom extension wrapper + if let Some(PhysicalPlanType::Extension(extension)) = &proto.physical_plan_type + && let Ok(payload) = + serde_json::from_slice::(&extension.node) + && payload.marker == EXTENSION_MARKER + { + println!( + " [Deserialize] Found adapter extension with tag: {}", + payload.adapter_metadata.tag + ); + + // Decode the inner plan + let inner_proto = PhysicalPlanNode::decode(&payload.inner_plan_bytes[..]) + .map_err(|e| { + datafusion::error::DataFusionError::Plan(format!( + "Failed to decode inner plan: {e}" + )) + })?; + + // Deserialize the inner plan using default implementation + let inner_plan = default_deserialize_physical_plan(&inner_proto, ctx, self)?; + + // Recreate the adapter factory + let adapter_factory = create_adapter_factory(&payload.adapter_metadata.tag); + + // Inject adapter into the plan + return inject_adapter_into_plan(inner_plan, adapter_factory); + } + + // Not our extension - use default deserialization + default_deserialize_physical_plan(proto, ctx, self) + } + + // Delegate expression serialization to defaults + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + default_serialize_physical_expr(expr, self) + } + + // Delegate expression deserialization to defaults + fn deserialize_physical_expr( + &self, + proto: &datafusion_proto::protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &Schema, + ) -> Result> { + default_deserialize_physical_expr(proto, ctx, input_schema, self) + } +} + +// ============================================================================ +// Helper functions +// ============================================================================ + +/// Write a RecordBatch to Parquet in the object store +async fn write_parquet( + store: &dyn ObjectStore, + path: &Path, + batch: &arrow::record_batch::RecordBatch, +) -> Result<()> { + let mut buf = vec![]; + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None)?; + writer.write(batch)?; + writer.close()?; + + let payload = PutPayload::from_bytes(buf.into()); + store.put(path, payload).await?; + Ok(()) +} + +/// Extract the tag from a MetadataAdapterFactory. +/// +/// Note: Since `PhysicalExprAdapterFactory` doesn't provide `as_any()` for downcasting, +/// we parse the Debug output. In a production system, you might add a dedicated trait +/// method for metadata extraction. +fn extract_adapter_tag(factory: &dyn PhysicalExprAdapterFactory) -> Option { + let debug_str = format!("{factory:?}"); + if debug_str.contains("MetadataAdapterFactory") { + // Extract tag from debug output: MetadataAdapterFactory { tag: "v1" } + if let Some(start) = debug_str.find("tag: \"") { + let after_tag = &debug_str[start + 6..]; + if let Some(end) = after_tag.find('"') { + return Some(after_tag[..end].to_string()); + } + } + } + None +} + +/// Create an adapter factory from a tag +fn create_adapter_factory(tag: &str) -> Arc { + Arc::new(MetadataAdapterFactory::new(tag)) +} + +/// Rebuild a FileScanConfig without the adapter +fn rebuild_config_without_adapter(config: &FileScanConfig) -> FileScanConfig { + FileScanConfigBuilder::from(config.clone()) + .with_expr_adapter(None) + .build() +} + +/// Inject an adapter into a plan (assumes plan is a DataSourceExec with FileScanConfig) +fn inject_adapter_into_plan( + plan: Arc, + adapter_factory: Arc, +) -> Result> { + if let Some(exec) = plan.as_any().downcast_ref::() + && let Some(config) = exec.data_source().as_any().downcast_ref::() + { + let new_config = FileScanConfigBuilder::from(config.clone()) + .with_expr_adapter(Some(adapter_factory)) + .build(); + return Ok(DataSourceExec::from_data_source(new_config)); + } + // If not a DataSourceExec with FileScanConfig, return as-is + Ok(plan) +} + +/// Helper to verify if a plan has an adapter (for testing/validation) +fn verify_adapter_in_plan(plan: &Arc, label: &str) -> bool { + // Walk the plan tree to find DataSourceExec with adapter + fn check_plan(plan: &dyn ExecutionPlan) -> bool { + if let Some(exec) = plan.as_any().downcast_ref::() + && let Some(config) = + exec.data_source().as_any().downcast_ref::() + && config.expr_adapter_factory.is_some() + { + return true; + } + // Check children + for child in plan.children() { + if check_plan(child.as_ref()) { + return true; + } + } + false + } + + let has_adapter = check_plan(plan.as_ref()); + println!(" [Verify] {label} plan adapter check: {has_adapter}"); + has_adapter +} diff --git a/datafusion-examples/examples/custom_data_source/main.rs b/datafusion-examples/examples/custom_data_source/main.rs index b5dcf10f5cdaa..23448e3210901 100644 --- a/datafusion-examples/examples/custom_data_source/main.rs +++ b/datafusion-examples/examples/custom_data_source/main.rs @@ -26,6 +26,7 @@ //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module +//! - `adapter_serialization` — preserve custom PhysicalExprAdapter information during plan serialization using PhysicalExtensionCodec interception //! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into Arrow RecordBatches //! - `csv_sql_streaming` — build and run a streaming query plan from a SQL statement against a local CSV file //! - `custom_datasource` — run queries against a custom datasource (TableProvider) @@ -34,6 +35,7 @@ //! - `default_column_values` — implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter //! - `file_stream_provider` — run a query on FileStreamProvider which implements StreamProvider for reading and writing to arbitrary stream sources/sinks +mod adapter_serialization; mod csv_json_opener; mod csv_sql_streaming; mod custom_datasource; @@ -50,6 +52,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames}; #[strum(serialize_all = "snake_case")] enum ExampleKind { All, + AdapterSerialization, CsvJsonOpener, CsvSqlStreaming, CustomDatasource, @@ -74,6 +77,9 @@ impl ExampleKind { Box::pin(example.run()).await?; } } + ExampleKind::AdapterSerialization => { + adapter_serialization::adapter_serialization().await? + } ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?, ExampleKind::CsvSqlStreaming => { csv_sql_streaming::csv_sql_streaming().await? diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index f3910d461b6a8..799da70a75140 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -158,6 +158,44 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { internal_err!("Not supported") } } + + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } #[derive(Debug)] @@ -232,4 +270,42 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { internal_err!("Not supported") } } + + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } diff --git a/datafusion-examples/examples/proto/expression_deduplication.rs b/datafusion-examples/examples/proto/expression_deduplication.rs new file mode 100644 index 0000000000000..15f102edfeed3 --- /dev/null +++ b/datafusion-examples/examples/proto/expression_deduplication.rs @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! See `main.rs` for how to run it. +//! +//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's +//! interception methods to implement expression deduplication during deserialization. +//! +//! This pattern is inspired by PR #18192, which introduces expression caching +//! to reduce memory usage when deserializing plans with duplicate expressions. +//! +//! The key insight is that identical expressions serialize to identical protobuf bytes. +//! By caching deserialized expressions keyed by their protobuf bytes, we can: +//! 1. Return the same Arc for duplicate expressions +//! 2. Reduce memory allocation during deserialization +//! 3. Enable downstream optimizations that rely on Arc pointer equality +//! +//! This demonstrates the decorator pattern enabled by the `PhysicalExtensionCodec` trait, +//! where all expression serialization/deserialization routes through the codec methods. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::{Arc, RwLock}; + +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::Result; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::Operator; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::expressions::{BinaryExpr, col}; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; +use datafusion::prelude::SessionContext; +use datafusion_proto::physical_plan::{ + AsExecutionPlan, PhysicalExtensionCodec, default_deserialize_physical_expr, + default_deserialize_physical_plan, default_serialize_physical_expr, + default_serialize_physical_plan, +}; +use datafusion_proto::protobuf; +use prost::Message; + +/// Example showing how to implement expression deduplication using the codec decorator pattern. +/// +/// This demonstrates: +/// 1. Creating a CachingCodec that caches expressions by their protobuf bytes +/// 2. Intercepting deserialization to return cached Arcs for duplicate expressions +/// 3. Verifying that duplicate expressions share the same Arc after deserialization +/// +/// Deduplication is keyed by the protobuf bytes representing the expression, +/// in reality deduplication could be done based on e.g. the pointer address of the +/// serialized expression in memory, but this is simpler to demonstrate. +/// +/// In this case our expression is trivial and just for demonstration purposes. +/// In real scenarios, expressions can be much more complex, e.g. a large InList +/// expression could be megabytes in size, so deduplication can save significant memory +/// in addition to more correctly representing the original plan structure. +pub async fn expression_deduplication() -> Result<()> { + println!("=== Expression Deduplication Example ===\n"); + + // Create a schema for our test expressions + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + + // Step 1: Create expressions with duplicates + println!("Step 1: Creating expressions with duplicates..."); + + // Create expression: col("a") + let a = col("a", &schema)?; + + // Create a clone to show duplicates + let a_clone = Arc::clone(&a); + + // Combine: a OR a_clone + let combined_expr = + Arc::new(BinaryExpr::new(a, Operator::Or, a_clone)) as Arc; + println!(" Created expression: a OR a with duplicates"); + println!(" Note: a appears twice in the expression tree\n"); + // Step 2: Create a filter plan with this expression + println!("Step 2: Creating physical plan with the expression..."); + + let input = Arc::new(PlaceholderRowExec::new(Arc::clone(&schema))); + let filter_plan: Arc = + Arc::new(FilterExec::try_new(combined_expr, input)?); + + println!(" Created FilterExec with duplicate sub-expressions\n"); + + // Step 3: Serialize with the caching codec + println!("Step 3: Serializing plan..."); + + let caching_codec = CachingCodec::new(); + let proto = protobuf::PhysicalPlanNode::try_from_physical_plan( + filter_plan.clone(), + &caching_codec, + )?; + + // Serialize to bytes + let mut bytes = Vec::new(); + proto.encode(&mut bytes).unwrap(); + println!(" Serialized plan to {} bytes\n", bytes.len()); + + // Step 4: Deserialize with the caching codec + println!("Step 4: Deserializing plan with CachingCodec..."); + + let ctx = SessionContext::new(); + let deserialized_plan = + proto.try_into_physical_plan(&ctx.task_ctx(), &caching_codec)?; + + // Step 5: check that we deduplicated expressions + println!("Step 5: Checking for deduplicated expressions..."); + let Some(filter_exec) = deserialized_plan.as_any().downcast_ref::() + else { + panic!("Deserialized plan is not a FilterExec"); + }; + let predicate = Arc::clone(&filter_exec.predicate()); + let binary_expr = predicate + .as_any() + .downcast_ref::() + .expect("Predicate is not a BinaryExpr"); + let left = &binary_expr.left(); + let right = &binary_expr.right(); + // Check if left and right point to the same Arc + let deduplicated = Arc::ptr_eq(left, right); + if deduplicated { + println!(" Success: Duplicate expressions were deduplicated!"); + println!( + " Cache Stats: hits={}, misses={}", + caching_codec.stats.read().unwrap().cache_hits, + caching_codec.stats.read().unwrap().cache_misses, + ); + } else { + println!(" Failure: Duplicate expressions were NOT deduplicated."); + } + + Ok(()) +} + +// ============================================================================ +// CachingCodec - Implements expression deduplication +// ============================================================================ + +/// Statistics for cache performance monitoring +#[derive(Debug, Default)] +struct CacheStats { + cache_hits: usize, + cache_misses: usize, +} + +/// A codec that caches deserialized expressions to enable deduplication. +/// +/// When deserializing, if we've already seen the same protobuf bytes, +/// we return the cached Arc instead of creating a new allocation. +#[derive(Debug, Default)] +struct CachingCodec { + /// Cache mapping protobuf bytes -> deserialized expression + expr_cache: RwLock, Arc>>, + /// Statistics for demonstration + stats: RwLock, +} + +impl CachingCodec { + fn new() -> Self { + Self::default() + } +} + +impl PhysicalExtensionCodec for CachingCodec { + // Required: decode custom extension nodes + fn try_decode( + &self, + _buf: &[u8], + _inputs: &[Arc], + _ctx: &TaskContext, + ) -> Result> { + datafusion::common::not_impl_err!("No custom extension nodes") + } + + // Required: encode custom execution plans + fn try_encode( + &self, + _node: Arc, + _buf: &mut Vec, + ) -> Result<()> { + datafusion::common::not_impl_err!("No custom extension nodes") + } + + // Delegate plan serialization to default + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + default_serialize_physical_plan(plan, self) + } + + // Delegate plan deserialization to default + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + default_deserialize_physical_plan(proto, ctx, self) + } + + // Delegate expression serialization to default + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + default_serialize_physical_expr(expr, self) + } + + // CACHING IMPLEMENTATION: Intercept expression deserialization + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &Schema, + ) -> Result> { + // Create cache key from protobuf bytes + let mut key = Vec::new(); + proto.encode(&mut key).map_err(|e| { + datafusion::error::DataFusionError::Internal(format!( + "Failed to encode proto for cache key: {e}" + )) + })?; + + // Check cache first + { + let cache = self.expr_cache.read().unwrap(); + if let Some(cached) = cache.get(&key) { + // Cache hit! Update stats and return cached Arc + let mut stats = self.stats.write().unwrap(); + stats.cache_hits += 1; + return Ok(Arc::clone(cached)); + } + } + + // Cache miss - deserialize and store + let expr = default_deserialize_physical_expr(proto, ctx, input_schema, self)?; + + // Store in cache + { + let mut cache = self.expr_cache.write().unwrap(); + cache.insert(key, Arc::clone(&expr)); + let mut stats = self.stats.write().unwrap(); + stats.cache_misses += 1; + } + + Ok(expr) + } +} diff --git a/datafusion-examples/examples/proto/main.rs b/datafusion-examples/examples/proto/main.rs index 9e4ae728206c4..01954b6c187b5 100644 --- a/datafusion-examples/examples/proto/main.rs +++ b/datafusion-examples/examples/proto/main.rs @@ -21,14 +21,16 @@ //! //! ## Usage //! ```bash -//! cargo run --example proto -- [all|composed_extension_codec] +//! cargo run --example proto -- [all|composed_extension_codec|expression_deduplication] //! ``` //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module //! - `composed_extension_codec` — example of using multiple extension codecs for serialization / deserialization +//! - `expression_deduplication` — example of expression caching/deduplication using the codec decorator pattern mod composed_extension_codec; +mod expression_deduplication; use datafusion::error::{DataFusionError, Result}; use strum::{IntoEnumIterator, VariantNames}; @@ -39,6 +41,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames}; enum ExampleKind { All, ComposedExtensionCodec, + ExpressionDeduplication, } impl ExampleKind { @@ -59,6 +62,9 @@ impl ExampleKind { ExampleKind::ComposedExtensionCodec => { composed_extension_codec::composed_extension_codec().await? } + ExampleKind::ExpressionDeduplication => { + expression_deduplication::expression_deduplication().await? + } } Ok(()) } diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index 89a9a2cead007..c370ac32a9887 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -408,6 +408,44 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { Ok(()) } + + fn deserialize_physical_plan( + &self, + proto: &datafusion_proto::protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &datafusion_proto::protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow_schema::Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } #[cfg(test)] @@ -563,6 +601,44 @@ pub(crate) mod tests { Ok(()) } + + fn deserialize_physical_plan( + &self, + proto: &datafusion_proto::protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &datafusion_proto::protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } fn create_test_exec() -> Arc { diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index d95bdd388699e..9f3ca572e129e 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -20,9 +20,7 @@ use crate::logical_plan::to_proto::serialize_expr; use crate::logical_plan::{ self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec, }; -use crate::physical_plan::{ - AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, -}; +use crate::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec}; use crate::protobuf; use datafusion_common::{Result, plan_datafusion_err}; use datafusion_execution::TaskContext; @@ -283,9 +281,10 @@ pub fn physical_plan_to_bytes(plan: Arc) -> Result { #[cfg(feature = "json")] pub fn physical_plan_to_json(plan: Arc) -> Result { let extension_codec = DefaultPhysicalExtensionCodec {}; - let protobuf = - protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec) - .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; + // Route through codec to enable interception + let protobuf = extension_codec + .serialize_physical_plan(plan) + .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; serde_json::to_string(&protobuf) .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}")) } @@ -295,8 +294,8 @@ pub fn physical_plan_to_bytes_with_extension_codec( plan: Arc, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let protobuf = - protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?; + // Route through codec to enable interception at the root level + let protobuf = extension_codec.serialize_physical_plan(plan)?; let mut buffer = BytesMut::new(); protobuf .encode(&mut buffer) @@ -313,7 +312,8 @@ pub fn physical_plan_from_json( let back: protobuf::PhysicalPlanNode = serde_json::from_str(json) .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; let extension_codec = DefaultPhysicalExtensionCodec {}; - back.try_into_physical_plan(ctx, &extension_codec) + // Route through codec to enable interception + extension_codec.deserialize_physical_plan(&back, ctx) } /// Deserialize a PhysicalPlan from bytes @@ -333,5 +333,6 @@ pub fn physical_plan_from_bytes_with_extension_codec( ) -> Result> { let protobuf = protobuf::PhysicalPlanNode::decode(bytes) .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?; - protobuf.try_into_physical_plan(ctx, extension_codec) + // Route through codec to enable interception at the root level + extension_codec.deserialize_physical_plan(&protobuf, ctx) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aa02e63a5d0d0..25e1efc81a3ef 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -193,7 +193,7 @@ where { protos .into_iter() - .map(|p| parse_physical_expr(p, ctx, input_schema, codec)) + .map(|p| codec.deserialize_physical_expr(p, ctx, input_schema)) .collect::>>() } @@ -303,7 +303,7 @@ pub fn parse_physical_expr( ExprType::Case(e) => Arc::new(CaseExpr::try_new( e.expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec)) + .map(|e| codec.deserialize_physical_expr(e.as_ref(), ctx, input_schema)) .transpose()?, e.when_then_expr .iter() @@ -328,7 +328,7 @@ pub fn parse_physical_expr( .collect::>>()?, e.else_expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), ctx, input_schema, codec)) + .map(|e| codec.deserialize_physical_expr(e.as_ref(), ctx, input_schema)) .transpose()?, )?), ExprType::Cast(e) => Arc::new(CastExpr::new( @@ -403,7 +403,7 @@ pub fn parse_physical_expr( let inputs: Vec> = extension .inputs .iter() - .map(|e| parse_physical_expr(e, ctx, input_schema, codec)) + .map(|e| codec.deserialize_physical_expr(e, ctx, input_schema)) .collect::>()?; (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _ } @@ -419,7 +419,7 @@ fn parse_required_physical_expr( input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { - expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec)) + expr.map(|e| codec.deserialize_physical_expr(e, ctx, input_schema)) .transpose()? .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}")) } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 5c64dfbce3b89..6b6058f36ac4d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -478,12 +478,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .children() .into_iter() .cloned() - .map(|i| { - protobuf::PhysicalPlanNode::try_from_physical_plan( - i, - extension_codec, - ) - }) + .map(|i| extension_codec.serialize_physical_plan(i)) .collect::>()?; Ok(protobuf::PhysicalPlanNode { @@ -1376,7 +1371,7 @@ impl protobuf::PhysicalPlanNode { ) -> Result> { let mut inputs: Vec> = vec![]; for input in &union.inputs { - inputs.push(input.try_into_physical_plan(ctx, extension_codec)?); + inputs.push(extension_codec.deserialize_physical_plan(input, ctx)?); } UnionExec::try_new(inputs) } @@ -1390,7 +1385,7 @@ impl protobuf::PhysicalPlanNode { ) -> Result> { let mut inputs: Vec> = vec![]; for input in &interleave.inputs { - inputs.push(input.try_into_physical_plan(ctx, extension_codec)?); + inputs.push(extension_codec.deserialize_physical_plan(input, ctx)?); } Ok(Arc::new(InterleaveExec::try_new(inputs)?)) } @@ -1546,7 +1541,7 @@ impl protobuf::PhysicalPlanNode { let inputs: Vec> = extension .inputs .iter() - .map(|i| i.try_into_physical_plan(ctx, extension_codec)) + .map(|i| extension_codec.deserialize_physical_plan(i, ctx)) .collect::>()?; let extension_node = @@ -2047,10 +2042,7 @@ impl protobuf::PhysicalPlanNode { exec: &ProjectionExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let expr = exec .expr() .iter() @@ -2076,10 +2068,7 @@ impl protobuf::PhysicalPlanNode { exec: &AnalyzeExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new( protobuf::AnalyzeExecNode { @@ -2096,10 +2085,7 @@ impl protobuf::PhysicalPlanNode { exec: &FilterExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Filter(Box::new( protobuf::FilterExecNode { @@ -2121,10 +2107,7 @@ impl protobuf::PhysicalPlanNode { limit: &GlobalLimitExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - limit.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(limit.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new( @@ -2144,10 +2127,7 @@ impl protobuf::PhysicalPlanNode { limit: &LocalLimitExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - limit.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(limit.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new( protobuf::LocalLimitExecNode { @@ -2162,14 +2142,8 @@ impl protobuf::PhysicalPlanNode { exec: &HashJoinExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let left = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.left().to_owned(), - extension_codec, - )?; - let right = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.right().to_owned(), - extension_codec, - )?; + let left = extension_codec.serialize_physical_plan(exec.left().to_owned())?; + let right = extension_codec.serialize_physical_plan(exec.right().to_owned())?; let on: Vec = exec .on() .iter() @@ -2238,14 +2212,8 @@ impl protobuf::PhysicalPlanNode { exec: &SymmetricHashJoinExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let left = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.left().to_owned(), - extension_codec, - )?; - let right = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.right().to_owned(), - extension_codec, - )?; + let left = extension_codec.serialize_physical_plan(exec.left().to_owned())?; + let right = extension_codec.serialize_physical_plan(exec.right().to_owned())?; let on = exec .on() .iter() @@ -2356,14 +2324,8 @@ impl protobuf::PhysicalPlanNode { exec: &SortMergeJoinExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let left = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.left().to_owned(), - extension_codec, - )?; - let right = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.right().to_owned(), - extension_codec, - )?; + let left = extension_codec.serialize_physical_plan(exec.left().to_owned())?; + let right = extension_codec.serialize_physical_plan(exec.right().to_owned())?; let on = exec .on() .iter() @@ -2440,14 +2402,8 @@ impl protobuf::PhysicalPlanNode { exec: &CrossJoinExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let left = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.left().to_owned(), - extension_codec, - )?; - let right = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.right().to_owned(), - extension_codec, - )?; + let left = extension_codec.serialize_physical_plan(exec.left().to_owned())?; + let right = extension_codec.serialize_physical_plan(exec.right().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new( protobuf::CrossJoinExecNode { @@ -2505,10 +2461,7 @@ impl protobuf::PhysicalPlanNode { } }; let input_schema = exec.input_schema(); - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let null_expr = exec .group_expr() @@ -2578,10 +2531,8 @@ impl protobuf::PhysicalPlanNode { coalesce_batches: &CoalesceBatchesExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - coalesce_batches.input().to_owned(), - extension_codec, - )?; + let input = extension_codec + .serialize_physical_plan(coalesce_batches.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new( protobuf::CoalesceBatchesExecNode { @@ -2748,10 +2699,7 @@ impl protobuf::PhysicalPlanNode { exec: &CoalescePartitionsExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Merge(Box::new( protobuf::CoalescePartitionsExecNode { @@ -2766,10 +2714,7 @@ impl protobuf::PhysicalPlanNode { exec: &RepartitionExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let pb_partitioning = serialize_partitioning(exec.partitioning(), extension_codec)?; @@ -2788,10 +2733,7 @@ impl protobuf::PhysicalPlanNode { exec: &SortExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let expr = exec .expr() .iter() @@ -2830,10 +2772,7 @@ impl protobuf::PhysicalPlanNode { ) -> Result { let mut inputs: Vec = vec![]; for input in union.inputs() { - inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan( - input.to_owned(), - extension_codec, - )?); + inputs.push(extension_codec.serialize_physical_plan(input.to_owned())?); } Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode { @@ -2848,10 +2787,7 @@ impl protobuf::PhysicalPlanNode { ) -> Result { let mut inputs: Vec = vec![]; for input in interleave.inputs() { - inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan( - input.to_owned(), - extension_codec, - )?); + inputs.push(extension_codec.serialize_physical_plan(input.to_owned())?); } Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Interleave( @@ -2864,10 +2800,7 @@ impl protobuf::PhysicalPlanNode { exec: &SortPreservingMergeExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let expr = exec .expr() .iter() @@ -2900,14 +2833,8 @@ impl protobuf::PhysicalPlanNode { exec: &NestedLoopJoinExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let left = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.left().to_owned(), - extension_codec, - )?; - let right = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.right().to_owned(), - extension_codec, - )?; + let left = extension_codec.serialize_physical_plan(exec.left().to_owned())?; + let right = extension_codec.serialize_physical_plan(exec.right().to_owned())?; let join_type: protobuf::JoinType = exec.join_type().to_owned().into(); let filter = exec @@ -2955,10 +2882,7 @@ impl protobuf::PhysicalPlanNode { exec: &WindowAggExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let window_expr = exec .window_expr() @@ -2988,10 +2912,7 @@ impl protobuf::PhysicalPlanNode { exec: &BoundedWindowAggExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; let window_expr = exec .window_expr() @@ -3038,10 +2959,7 @@ impl protobuf::PhysicalPlanNode { extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input: protobuf::PhysicalPlanNode = - protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + extension_codec.serialize_physical_plan(exec.input().to_owned())?; let sort_order = match exec.sort_order() { Some(requirements) => { let expr = requirements @@ -3114,10 +3032,7 @@ impl protobuf::PhysicalPlanNode { exec: &UnnestExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new( @@ -3147,10 +3062,7 @@ impl protobuf::PhysicalPlanNode { exec: &CooperativeExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - exec.input().to_owned(), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(exec.input().to_owned())?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new( @@ -3280,10 +3192,7 @@ impl protobuf::PhysicalPlanNode { exec: &AsyncFuncExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan( - Arc::clone(exec.input()), - extension_codec, - )?; + let input = extension_codec.serialize_physical_plan(Arc::clone(exec.input()))?; let mut async_exprs = vec![]; let mut async_expr_names = vec![]; @@ -3330,7 +3239,110 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { Self: Sized; } +/// Trait for custom serialization and deserialization of physical plans and expressions. +/// +/// This trait provides methods for handling: +/// 1. Extension/custom execution plans and expressions (via `try_decode`/`try_encode` methods) +/// 2. User-defined functions (UDFs, UDAFs, UDWFs) +/// 3. Interception of ALL plan/expression nodes during serialization/deserialization +/// +/// The `deserialize_physical_plan`, `serialize_physical_plan`, `deserialize_physical_expr`, +/// and `serialize_physical_expr` methods are called for **every** node in the plan/expression +/// tree, allowing implementations to intercept, cache, transform, or otherwise customize +/// the serialization process. +/// +/// # Example: Fully Default Implementation +/// +/// If you want standard behavior without custom logic, delegate to the provided helper functions: +/// +/// ```ignore +/// use datafusion_proto::physical_plan::{ +/// PhysicalExtensionCodec, default_deserialize_physical_plan, +/// default_serialize_physical_plan, default_deserialize_physical_expr, +/// default_serialize_physical_expr, +/// }; +/// +/// #[derive(Debug)] +/// struct MyCodec; +/// +/// impl PhysicalExtensionCodec for MyCodec { +/// // Required extension methods (for custom ExecutionPlan types) +/// fn try_decode( +/// &self, +/// buf: &[u8], +/// inputs: &[Arc], +/// ctx: &TaskContext, +/// ) -> Result> { +/// not_impl_err!("No custom ExecutionPlan types supported") +/// } +/// +/// fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { +/// not_impl_err!("No custom ExecutionPlan types supported") +/// } +/// +/// // Interception methods - delegate to defaults for standard behavior +/// fn deserialize_physical_plan( +/// &self, +/// proto: &protobuf::PhysicalPlanNode, +/// ctx: &TaskContext, +/// ) -> Result> { +/// default_deserialize_physical_plan(proto, ctx, self) +/// } +/// +/// fn serialize_physical_plan( +/// &self, +/// plan: Arc, +/// ) -> Result { +/// default_serialize_physical_plan(plan, self) +/// } +/// +/// fn deserialize_physical_expr( +/// &self, +/// proto: &protobuf::PhysicalExprNode, +/// ctx: &TaskContext, +/// input_schema: &arrow::datatypes::Schema, +/// ) -> Result> { +/// default_deserialize_physical_expr(proto, ctx, input_schema, self) +/// } +/// +/// fn serialize_physical_expr( +/// &self, +/// expr: &Arc, +/// ) -> Result { +/// default_serialize_physical_expr(expr, self) +/// } +/// } +/// ``` +/// +/// # Example: Caching Deserializer +/// +/// Custom implementations can intercept deserialization to add caching: +/// +/// ```ignore +/// impl PhysicalExtensionCodec for CachingCodec { +/// fn deserialize_physical_expr( +/// &self, +/// proto: &protobuf::PhysicalExprNode, +/// ctx: &TaskContext, +/// input_schema: &arrow::datatypes::Schema, +/// ) -> Result> { +/// // Check cache first +/// if let Some(cached) = self.expr_cache.get(proto) { +/// return Ok(Arc::clone(cached)); +/// } +/// // Deserialize and cache +/// let expr = default_deserialize_physical_expr(proto, ctx, input_schema, self)?; +/// self.expr_cache.insert(proto.clone(), Arc::clone(&expr)); +/// Ok(expr) +/// } +/// // ... other methods +/// } +/// ``` pub trait PhysicalExtensionCodec: Debug + Send + Sync { + /// Decode a custom/extension [`ExecutionPlan`] from bytes. + /// + /// This is called as a fallback when the standard deserialization + /// encounters an unknown plan type (marked as Extension in protobuf). fn try_decode( &self, buf: &[u8], @@ -3338,8 +3350,59 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { ctx: &TaskContext, ) -> Result>; + /// Encode a custom/extension [`ExecutionPlan`] to bytes. + /// + /// This is called as a fallback when the standard serialization + /// encounters an unknown plan type. fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; + /// Deserialize a physical plan node from protobuf. + /// + /// This method is called for **every** plan node during deserialization, + /// allowing implementations to intercept, cache, or transform nodes. + /// + /// For standard behavior, delegate to [`default_deserialize_physical_plan`]. + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result>; + + /// Serialize an execution plan to protobuf. + /// + /// This method is called for **every** plan node during serialization, + /// allowing implementations to intercept, cache, or transform nodes. + /// + /// For standard behavior, delegate to [`default_serialize_physical_plan`]. + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result; + + /// Deserialize a physical expression from protobuf. + /// + /// This method is called for **every** expression node during deserialization, + /// allowing implementations to intercept, cache, or transform expressions. + /// + /// For standard behavior, delegate to [`default_deserialize_physical_expr`]. + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result>; + + /// Serialize a physical expression to protobuf. + /// + /// This method is called for **every** expression node during serialization, + /// allowing implementations to intercept, cache, or transform expressions. + /// + /// For standard behavior, delegate to [`default_serialize_physical_expr`]. + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result; + fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}") } @@ -3383,6 +3446,69 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { } } +/// Default implementation for deserializing a physical plan from protobuf. +/// +/// This function provides the standard deserialization logic. Custom +/// [`PhysicalExtensionCodec`] implementations can call this to get default +/// behavior while adding their own logic before or after. +/// +/// The deserialization recursively calls `codec.deserialize_physical_plan` +/// for child nodes, allowing the codec to intercept every node in the tree. +pub fn default_deserialize_physical_plan( + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, +) -> Result> { + proto.try_into_physical_plan(ctx, codec) +} + +/// Default implementation for serializing an execution plan to protobuf. +/// +/// This function provides the standard serialization logic. Custom +/// [`PhysicalExtensionCodec`] implementations can call this to get default +/// behavior while adding their own logic before or after. +/// +/// The serialization recursively calls `codec.serialize_physical_plan` +/// for child nodes, allowing the codec to intercept every node in the tree. +pub fn default_serialize_physical_plan( + plan: Arc, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + protobuf::PhysicalPlanNode::try_from_physical_plan(plan, codec) +} + +/// Default implementation for deserializing a physical expression from protobuf. +/// +/// This function provides the standard deserialization logic. Custom +/// [`PhysicalExtensionCodec`] implementations can call this to get default +/// behavior while adding their own logic before or after. +/// +/// The deserialization recursively calls `codec.deserialize_physical_expr` +/// for child expressions, allowing the codec to intercept every expression in the tree. +pub fn default_deserialize_physical_expr( + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + codec: &dyn PhysicalExtensionCodec, +) -> Result> { + from_proto::parse_physical_expr(proto, ctx, input_schema, codec) +} + +/// Default implementation for serializing a physical expression to protobuf. +/// +/// This function provides the standard serialization logic. Custom +/// [`PhysicalExtensionCodec`] implementations can call this to get default +/// behavior while adding their own logic before or after. +/// +/// The serialization recursively calls `codec.serialize_physical_expr` +/// for child expressions, allowing the codec to intercept every expression in the tree. +pub fn default_serialize_physical_expr( + expr: &Arc, + codec: &dyn PhysicalExtensionCodec, +) -> Result { + to_proto::serialize_physical_expr(expr, codec) +} + #[derive(Debug)] pub struct DefaultPhysicalExtensionCodec {} @@ -3403,6 +3529,37 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } + + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + default_deserialize_physical_plan(proto, ctx, self) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + default_deserialize_physical_expr(proto, ctx, input_schema, self) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + default_serialize_physical_expr(expr, self) + } } /// DataEncoderTuple captures the position of the encoder @@ -3515,6 +3672,37 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data)) } + + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + default_deserialize_physical_plan(proto, ctx, self) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + default_deserialize_physical_expr(proto, ctx, input_schema, self) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + default_serialize_physical_expr(expr, self) + } } fn into_physical_plan( @@ -3523,7 +3711,7 @@ fn into_physical_plan( extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { if let Some(field) = node { - field.try_into_physical_plan(ctx, extension_codec) + extension_codec.deserialize_physical_plan(field, ctx) } else { Err(proto_error("Missing required field in protobuf")) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b06dec592d5c3..29f5fb5fe01ba 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -211,7 +211,7 @@ where { values .into_iter() - .map(|value| serialize_physical_expr(value, codec)) + .map(|value| codec.serialize_physical_expr(value)) .collect() } @@ -271,8 +271,8 @@ pub fn serialize_physical_expr( }) } else if let Some(expr) = expr.downcast_ref::() { let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode { - l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)), - r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)), + l: Some(Box::new(codec.serialize_physical_expr(expr.left())?)), + r: Some(Box::new(codec.serialize_physical_expr(expr.right())?)), op: format!("{:?}", expr.op()), }); @@ -290,7 +290,7 @@ pub fn serialize_physical_expr( expr: expr .expr() .map(|exp| { - serialize_physical_expr(exp, codec).map(Box::new) + codec.serialize_physical_expr(exp).map(Box::new) }) .transpose()?, when_then_expr: expr @@ -305,7 +305,7 @@ pub fn serialize_physical_expr( >>()?, else_expr: expr .else_expr() - .map(|a| serialize_physical_expr(a, codec).map(Box::new)) + .map(|a| codec.serialize_physical_expr(a).map(Box::new)) .transpose()?, }, ), @@ -316,7 +316,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { - expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.arg())?)), }, ))), }) @@ -324,7 +324,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { - expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.arg())?)), }), )), }) @@ -332,7 +332,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { - expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.arg())?)), }), )), }) @@ -340,7 +340,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { - expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.expr())?)), list: serialize_physical_exprs(expr.list(), codec)?, negated: expr.negated(), }, @@ -350,7 +350,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { - expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.arg())?)), }, ))), }) @@ -364,7 +364,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { - expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(cast.expr())?)), arrow_type: Some(cast.cast_type().try_into()?), }, ))), @@ -373,7 +373,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { - expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)), + expr: Some(Box::new(codec.serialize_physical_expr(cast.expr())?)), arrow_type: Some(cast.cast_type().try_into()?), }, ))), @@ -402,11 +402,10 @@ pub fn serialize_physical_expr( protobuf::PhysicalLikeExprNode { negated: expr.negated(), case_insensitive: expr.case_insensitive(), - expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)), - pattern: Some(Box::new(serialize_physical_expr( - expr.pattern(), - codec, - )?)), + expr: Some(Box::new(codec.serialize_physical_expr(expr.expr())?)), + pattern: Some(Box::new( + codec.serialize_physical_expr(expr.pattern())?, + )), }, ))), }) @@ -417,7 +416,7 @@ pub fn serialize_physical_expr( let inputs: Vec = value .children() .into_iter() - .map(|e| serialize_physical_expr(e, codec)) + .map(|e| codec.serialize_physical_expr(e)) .collect::>()?; Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( @@ -468,8 +467,8 @@ fn serialize_when_then_expr( codec: &dyn PhysicalExtensionCodec, ) -> Result { Ok(protobuf::PhysicalWhenThen { - when_expr: Some(serialize_physical_expr(when_expr, codec)?), - then_expr: Some(serialize_physical_expr(then_expr, codec)?), + when_expr: Some(codec.serialize_physical_expr(when_expr)?), + then_expr: Some(codec.serialize_physical_expr(then_expr)?), }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fa505e6f1520a..64990b7db335a 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1074,6 +1074,44 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { internal_err!("Not supported") } } + + fn deserialize_physical_plan( + &self, + proto: &datafusion_proto::protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &datafusion_proto::protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } let exec_plan = DataSourceExec::from_data_source(scan_config); @@ -1230,6 +1268,44 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { } Ok(()) } + + fn deserialize_physical_plan( + &self, + proto: &datafusion_proto::protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_plan( + proto, ctx, self, + ) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &datafusion_proto::protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &arrow::datatypes::Schema, + ) -> Result> { + datafusion_proto::physical_plan::default_deserialize_physical_expr( + proto, + ctx, + input_schema, + self, + ) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + datafusion_proto::physical_plan::default_serialize_physical_expr(expr, self) + } } #[test] diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 159bd3e4e790e..7044263083e03 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -84,6 +84,101 @@ You can disable the new behavior by setting the [configuration setting]: https://datafusion.apache.org/user-guide/configs.html [not yet the default behavior]: https://github.com/apache/datafusion/issues/3463 +### Breaking Change: `PhysicalExtensionCodec` requires 4 new methods + +The `PhysicalExtensionCodec` trait now requires implementing 4 additional methods +that intercept all plan and expression serialization/deserialization. These methods +are called for **every** node in the plan/expression tree, enabling use cases like +caching, transformation, and custom metadata injection during serialization. + +**New required methods:** + +```rust,ignore +pub trait PhysicalExtensionCodec: Debug + Send + Sync { + // ... existing methods unchanged ... + + /// Deserialize a physical plan node from protobuf. + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result>; + + /// Serialize an execution plan to protobuf. + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result; + + /// Deserialize a physical expression from protobuf. + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &Schema, + ) -> Result>; + + /// Serialize a physical expression to protobuf. + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result; +} +``` + +**Migration:** For existing implementations that don't need custom interception logic, +delegate to the provided helper functions: + +```rust,ignore +use datafusion_proto::physical_plan::{ + PhysicalExtensionCodec, default_deserialize_physical_plan, + default_serialize_physical_plan, default_deserialize_physical_expr, + default_serialize_physical_expr, +}; + +impl PhysicalExtensionCodec for MyCodec { + // ... existing methods ... + + fn deserialize_physical_plan( + &self, + proto: &protobuf::PhysicalPlanNode, + ctx: &TaskContext, + ) -> Result> { + default_deserialize_physical_plan(proto, ctx, self) + } + + fn serialize_physical_plan( + &self, + plan: Arc, + ) -> Result { + default_serialize_physical_plan(plan, self) + } + + fn deserialize_physical_expr( + &self, + proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, + input_schema: &Schema, + ) -> Result> { + default_deserialize_physical_expr(proto, ctx, input_schema, self) + } + + fn serialize_physical_expr( + &self, + expr: &Arc, + ) -> Result { + default_serialize_physical_expr(expr, self) + } +} +``` + +**Use cases for custom implementations:** + +- Caching deserialized expressions to avoid redundant work +- Transforming nodes during serialization/deserialization +- Injecting custom metadata not in the protobuf schema +- Implementing decorator patterns around standard serialization + ### Statistics handling moved from `FileSource` to `FileScanConfig` Statistics are now managed directly by `FileScanConfig` instead of being delegated to `FileSource` implementations. This simplifies the `FileSource` trait and provides more consistent statistics handling across all file formats.