Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use abi_stable::{export_root_module, prefix_type::PrefixTypeTrait};
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{common::record_batch, datasource::MemTable};
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use datafusion_ffi::table_provider::FFI_TableProvider;
use ffi_module_interface::{TableProviderModule, TableProviderModuleRef};

Expand All @@ -34,7 +35,9 @@ fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {

/// Here we only wish to create a simple table provider as an example.
/// We create an in-memory table and convert it to it's FFI counterpart.
extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
extern "C" fn construct_simple_table_provider(
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableProvider {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, true),
Expand All @@ -50,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {

let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();

FFI_TableProvider::new(Arc::new(table_provider), true, None)
FFI_TableProvider::new_with_ffi_codec(Arc::new(table_provider), true, None, codec)
}

#[export_root_module]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use abi_stable::{
sabi_types::VersionStrings,
StableAbi,
};
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use datafusion_ffi::table_provider::FFI_TableProvider;

#[repr(C)]
Expand All @@ -34,7 +35,8 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
/// how a user may wish to separate these concerns.
pub struct TableProviderModule {
/// Constructs the table provider
pub create_table: extern "C" fn() -> FFI_TableProvider,
pub create_table:
extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProvider,
}

impl RootModule for TableProviderModuleRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ publish = false
[dependencies]
abi_stable = "0.11.3"
datafusion = { workspace = true }
datafusion-ffi = { workspace = true }
ffi_module_interface = { path = "../ffi_module_interface" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
11 changes: 8 additions & 3 deletions datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use datafusion::{

use abi_stable::library::{development_utils::compute_library_path, RootModule};
use datafusion::datasource::TableProvider;
use datafusion::execution::TaskContextProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use ffi_module_interface::TableProviderModuleRef;

#[tokio::main]
Expand All @@ -39,21 +41,24 @@ async fn main() -> Result<()> {
TableProviderModuleRef::load_from_directory(&library_path)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let ctx = Arc::new(SessionContext::new());
let codec = FFI_LogicalExtensionCodec::new_default(
&(Arc::clone(&ctx) as Arc<dyn TaskContextProvider>),
);

// By calling the code below, the table provided will be created within
// the module's code.
let ffi_table_provider =
table_provider_module
.create_table()
.ok_or(DataFusionError::NotImplemented(
"External table provider failed to implement create_table".to_string(),
))?();
))?(codec);

// In order to access the table provider within this executable, we need to
// turn it into a `TableProvider`.
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();

let ctx = SessionContext::new();

// Display the data to show the full cycle works.
ctx.register_table("external_table", foreign_table_provider)?;
let df = ctx.table("external_table").await?;
Expand Down
94 changes: 72 additions & 22 deletions datafusion/ffi/src/catalog_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, ffi::c_void, sync::Arc};

use abi_stable::{
std_types::{ROption, RResult, RString, RVec},
StableAbi,
use std::any::Any;
use std::ffi::c_void;
use std::sync::Arc;

use abi_stable::std_types::{ROption, RResult, RString, RVec};
use abi_stable::StableAbi;
use datafusion_catalog::{CatalogProvider, SchemaProvider};
use datafusion_common::error::Result;
use datafusion_proto::logical_plan::{
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use tokio::runtime::Handle;

use crate::{
df_result, rresult_return,
schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
};

use crate::execution::FFI_TaskContextProvider;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use crate::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
use crate::util::FFIResult;
use datafusion::error::Result;
use crate::{df_result, rresult_return};

/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
#[repr(C)]
Expand Down Expand Up @@ -58,6 +60,8 @@ pub struct FFI_CatalogProvider {
)
-> FFIResult<ROption<FFI_SchemaProvider>>,

pub logical_codec: FFI_LogicalExtensionCodec,

/// Used to create a clone on the provider of the execution plan. This should
/// only need to be called by the receiver of the plan.
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
Expand Down Expand Up @@ -111,7 +115,13 @@ unsafe extern "C" fn schema_fn_wrapper(
) -> ROption<FFI_SchemaProvider> {
let maybe_schema = provider.inner().schema(name.as_str());
maybe_schema
.map(|schema| FFI_SchemaProvider::new(schema, provider.runtime()))
.map(|schema| {
FFI_SchemaProvider::new_with_ffi_codec(
schema,
provider.runtime(),
provider.logical_codec.clone(),
)
})
.into()
}

Expand All @@ -121,12 +131,18 @@ unsafe extern "C" fn register_schema_fn_wrapper(
schema: &FFI_SchemaProvider,
) -> FFIResult<ROption<FFI_SchemaProvider>> {
let runtime = provider.runtime();
let provider = provider.inner();
let inner_provider = provider.inner();
let schema: Arc<dyn SchemaProvider + Send> = schema.into();

let returned_schema =
rresult_return!(provider.register_schema(name.as_str(), schema))
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
rresult_return!(inner_provider.register_schema(name.as_str(), schema))
.map(|schema| {
FFI_SchemaProvider::new_with_ffi_codec(
schema,
runtime,
provider.logical_codec.clone(),
)
})
.into();

RResult::ROk(returned_schema)
Expand All @@ -138,14 +154,20 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
cascade: bool,
) -> FFIResult<ROption<FFI_SchemaProvider>> {
let runtime = provider.runtime();
let provider = provider.inner();
let inner_provider = provider.inner();

let maybe_schema =
rresult_return!(provider.deregister_schema(name.as_str(), cascade));
rresult_return!(inner_provider.deregister_schema(name.as_str(), cascade));

RResult::ROk(
maybe_schema
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
.map(|schema| {
FFI_SchemaProvider::new_with_ffi_codec(
schema,
runtime,
provider.logical_codec.clone(),
)
})
.into(),
)
}
Expand Down Expand Up @@ -173,6 +195,7 @@ unsafe extern "C" fn clone_fn_wrapper(
schema: schema_fn_wrapper,
register_schema: register_schema_fn_wrapper,
deregister_schema: deregister_schema_fn_wrapper,
logical_codec: provider.logical_codec.clone(),
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
Expand All @@ -192,6 +215,24 @@ impl FFI_CatalogProvider {
pub fn new(
provider: Arc<dyn CatalogProvider + Send>,
runtime: Option<Handle>,
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
) -> Self {
let task_ctx_provider = task_ctx_provider.into();
let logical_codec =
logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
let logical_codec = FFI_LogicalExtensionCodec::new(
logical_codec,
runtime.clone(),
task_ctx_provider.clone(),
);
Self::new_with_ffi_codec(provider, runtime, logical_codec)
}

pub fn new_with_ffi_codec(
provider: Arc<dyn CatalogProvider + Send>,
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
let private_data = Box::new(ProviderPrivateData { provider, runtime });

Expand All @@ -200,6 +241,7 @@ impl FFI_CatalogProvider {
schema: schema_fn_wrapper,
register_schema: register_schema_fn_wrapper,
deregister_schema: deregister_schema_fn_wrapper,
logical_codec,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
Expand Down Expand Up @@ -269,7 +311,11 @@ impl CatalogProvider for ForeignCatalogProvider {
unsafe {
let schema = match schema.as_any().downcast_ref::<ForeignSchemaProvider>() {
Some(s) => &s.0,
None => &FFI_SchemaProvider::new(schema, None),
None => &FFI_SchemaProvider::new_with_ffi_codec(
schema,
None,
self.0.logical_codec.clone(),
),
};
let returned_schema: Option<FFI_SchemaProvider> =
df_result!((self.0.register_schema)(&self.0, name.into(), schema))?
Expand Down Expand Up @@ -312,8 +358,10 @@ mod tests {
.register_schema("prior_schema", prior_schema)
.unwrap()
.is_none());
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();

let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);
let mut ffi_catalog =
FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None);
ffi_catalog.library_marker_id = crate::mock_foreign_marker_id;

let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();
Expand Down Expand Up @@ -356,7 +404,9 @@ mod tests {
fn test_ffi_catalog_provider_local_bypass() {
let catalog = Arc::new(MemoryCatalogProvider::new());

let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
let mut ffi_catalog =
FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None);

// Verify local libraries can be downcast to their original
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();
Expand Down
Loading