-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: update FFI TableProvider and ExecutionPlan to use FFI Session and TaskContext #19281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: update FFI TableProvider and ExecutionPlan to use FFI Session and TaskContext #19281
Conversation
|
@paleolimbot @comphead @milenkovicm @renato2099 Almost done with the FFI epic! There is only one PR after these and it's just an import cleanup job. I'm leaving this in draft until I write up the upgrade guide, but the code here is I believe in decent shape. This pulls together all of the previous work. |
| let default_ctx = SessionContext::new(); | ||
| let codec = DefaultLogicalExtensionCodec {}; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the main thrust of this work. In this place and others we no longer rely on creating a default SessionContext and codec.
| let codec = DefaultPhysicalExtensionCodec {}; | ||
| let partitioning_data = rresult_return!(serialize_partitioning( | ||
| properties.inner().output_partitioning(), | ||
| &codec | ||
| )); | ||
| let output_partitioning = partitioning_data.encode_to_vec(); | ||
|
|
||
| ROk(output_partitioning.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is one of the big wins on the physical side - no longer using the protobuf serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool, although my reading of this means that any ExecutionPlan passed via FFI will not benefit from any physical optimization in the same way it would have if serialized/unserialized via protobuf. It seems like these are mostly for TableProviders where the plan being passed around would normally not participate in that anyway but I thought I would point that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for noting that @paleolimbot ! that was not obvious to me
timsaucer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Address comments from in person review
| supports_filters_pushdown_internal(provider.inner(), &filters_serialized) | ||
| .map_err(|e| e.to_string().into()) | ||
| .into() | ||
| ) -> RResult<RVec<FFI_TableProviderFilterPushDown>, RString> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch to FFI_Result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need updating?
| .with_config(config) | ||
| .build(); | ||
| let ctx = SessionContext::new_with_state(session); | ||
| let foreign_session = rresult_return!(ForeignSession::try_from(&session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ForeignSession is not cheap to create, so use a different work around to avoid creating if not neccessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could possibly push this type of optimization into the ForeignSession by only doing expensive initialization if one of the methods is actually called using a OnceLock or something. If I'm remembering that PR correctly, the main issue there is that some of the session trait items don't return a Result so there's no opportunity for failure.
| .with_config(config) | ||
| .build(); | ||
| let ctx = SessionContext::new_with_state(session); | ||
| let foreign_session = rresult_return!(ForeignSession::try_from(&session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as other comment
paleolimbot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such a satisfying amount of hack removal! This looks reasonable to me.
It seems like we are leaning on existing tests to cover the functionality here...I imagine it is tricky to test the situation where the structs you've listed in the upgrade guide are actually provided by a separate build and are actually passing user-defined functions that aren't in the default SessionContext as filters (perhaps datafusion-python is where those high level tests will live?)
| let codec = DefaultPhysicalExtensionCodec {}; | ||
| let partitioning_data = rresult_return!(serialize_partitioning( | ||
| properties.inner().output_partitioning(), | ||
| &codec | ||
| )); | ||
| let output_partitioning = partitioning_data.encode_to_vec(); | ||
|
|
||
| ROk(output_partitioning.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool, although my reading of this means that any ExecutionPlan passed via FFI will not benefit from any physical optimization in the same way it would have if serialized/unserialized via protobuf. It seems like these are mostly for TableProviders where the plan being passed around would normally not participate in that anyway but I thought I would point that out.
| supports_filters_pushdown_internal(provider.inner(), &filters_serialized) | ||
| .map_err(|e| e.to_string().into()) | ||
| .into() | ||
| ) -> RResult<RVec<FFI_TableProviderFilterPushDown>, RString> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need updating?
| .with_config(config) | ||
| .build(); | ||
| let ctx = SessionContext::new_with_state(session); | ||
| let foreign_session = rresult_return!(ForeignSession::try_from(&session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could possibly push this type of optimization into the ForeignSession by only doing expensive initialization if one of the methods is actually called using a OnceLock or something. If I'm remembering that PR correctly, the main issue there is that some of the session trait items don't return a Result so there's no opportunity for failure.
| let codec = DefaultPhysicalExtensionCodec {}; | ||
| let partitioning_data = rresult_return!(serialize_partitioning( | ||
| properties.inner().output_partitioning(), | ||
| &codec | ||
| )); | ||
| let output_partitioning = partitioning_data.encode_to_vec(); | ||
|
|
||
| ROk(output_partitioning.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for noting that @paleolimbot ! that was not obvious to me
| // TODO Extend FFI to get the registry and codex | ||
| let default_ctx = SessionContext::new(); | ||
| let task_context = default_ctx.task_ctx(); | ||
| let codex = DefaultPhysicalExtensionCodec {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my own learning, before we were setting a specific codex here, do we need to worry about it? should we be using the one that is coming with from ffi ?
Which issue does this PR close?
Addresses part of #18671 but does not close it.
Rationale for this change
This is the major change to address the requirements of #18671. This PR combines all of the previous PRs in the issue and uses them in
FFI_TableProviderandFFI_ExecutionPlan. With this change the only remaining thing to close the issue is to remove the core crate. That is a large PR that mostly just changes import paths and will be a follow up.What changes are included in this PR?
FFI_PhysicalExpr,FFI_Session,FFI_TaskContext, andFFI_LogicalExtensionCodec.SessionContextwithin the FFI crateAre these changes tested?
Unit tests are added. Coverage report:

Are there any user-facing changes?
Yes.
There is one major change to using the FFI crate that downstream users will need to implement. Now when creating a table provider, catalog provider, etc you need to provide a
TaskContextProviderand an optionalLogicalExtensionCodec. The upgrade guide has been updated.