-
Notifications
You must be signed in to change notification settings - Fork 581
Some tweaks to draft table APIs #11880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
385f282
3012770
812c9e2
b1c44a1
409302e
f65c37b
0218496
30147c5
6cb1c17
b71c2e1
1dec6c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,13 +1,15 @@ | ||||||||
| from __future__ import annotations | ||||||||
|
|
||||||||
| import tempfile | ||||||||
| from pathlib import Path | ||||||||
| from typing import TYPE_CHECKING, Any | ||||||||
|
|
||||||||
| import datafusion | ||||||||
| from rerun import catalog as _catalog | ||||||||
|
|
||||||||
| if TYPE_CHECKING: | ||||||||
| from datetime import datetime | ||||||||
|
|
||||||||
| import datafusion | ||||||||
| import pyarrow as pa | ||||||||
|
|
||||||||
|
|
||||||||
|
|
@@ -16,6 +18,7 @@ class CatalogClient: | |||||||
|
|
||||||||
| def __init__(self, address: str, token: str | None = None) -> None: | ||||||||
| self._inner = _catalog.CatalogClient(address, token) | ||||||||
| self.tmpdirs = [] | ||||||||
|
|
||||||||
| def __repr__(self) -> str: | ||||||||
| return repr(self._inner) | ||||||||
|
|
@@ -80,8 +83,12 @@ def register_table(self, name: str, url: str) -> TableEntry: | |||||||
| """Registers a foreign Lance table as a new table entry.""" | ||||||||
| return TableEntry(self._inner.register_table(name, url)) | ||||||||
|
|
||||||||
| def create_table_entry(self, name: str, schema, url: str) -> TableEntry: | ||||||||
| def create_table(self, name: str, schema, url: str | None = None) -> TableEntry: | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: we tried to establish the convention that: |
||||||||
| """Create and register a new table.""" | ||||||||
| if url is None: | ||||||||
| tmpdir = tempfile.TemporaryDirectory() | ||||||||
| self.tmpdirs.append(tmpdir) | ||||||||
| url = Path(tmpdir.name).as_uri() | ||||||||
|
Comment on lines
+86
to
+89
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I love the utter nastiness we are allowed in here 😅 |
||||||||
| return TableEntry(self._inner.create_table_entry(name, schema, url)) | ||||||||
|
|
||||||||
| def write_table(self, name: str, batches, insert_mode) -> None: | ||||||||
|
|
@@ -101,6 +108,14 @@ def ctx(self) -> datafusion.SessionContext: | |||||||
| """Returns a DataFusion session context for querying the catalog.""" | ||||||||
| return self._inner.ctx | ||||||||
|
|
||||||||
| def __del__(self) -> None: | ||||||||
| # Safety net: avoid warning if GC happens late | ||||||||
| try: | ||||||||
| for tmpdir in self.tmpdirs: | ||||||||
| tmpdir.cleanup() | ||||||||
| except Exception: | ||||||||
| pass | ||||||||
|
|
||||||||
|
|
||||||||
| class Entry: | ||||||||
| """An entry in the catalog.""" | ||||||||
|
|
@@ -290,28 +305,33 @@ def do_maintenance( | |||||||
| ) | ||||||||
|
|
||||||||
|
|
||||||||
| class TableEntry(Entry): | ||||||||
| class TableEntry(Entry, datafusion.DataFrame): | ||||||||
|
||||||||
| class TableEntry(Entry, datafusion.DataFrame): | |
| # TODO(RR-2939): can we do better than multiple inheritance? | |
| class TableEntry(Entry, datafusion.DataFrame): |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,47 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| from rerun import server as _server | ||
|
|
||
| from .catalog import CatalogClient | ||
|
|
||
| if TYPE_CHECKING: | ||
| from os import PathLike | ||
| from types import TracebackType | ||
|
|
||
|
|
||
| class Server: | ||
| __init__ = _server.Server.__init__ | ||
| address = _server.Server.address | ||
| is_running = _server.Server.is_running | ||
| shutdown = _server.Server.shutdown | ||
| __enter__ = _server.Server.__enter__ | ||
| __exit__ = _server.Server.__exit__ | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| address: str = "0.0.0.0", | ||
| port: int | None = None, | ||
| datasets: dict[str, PathLike[str]] | None = None, | ||
| tables: dict[str, PathLike[str]] | None = None, | ||
| ) -> None: | ||
| self._internal = _server.Server( | ||
| address=address, | ||
| port=port, | ||
| datasets=datasets, | ||
| tables=tables, | ||
| ) | ||
|
|
||
| def __enter__(self) -> Server: | ||
| self._internal.__enter__() | ||
| return self | ||
|
|
||
| def __exit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc_value: BaseException | None, | ||
| traceback: TracebackType | None, | ||
| ) -> None: | ||
| self._internal.__exit__(exc_type, exc_value, traceback) | ||
|
|
||
| def client(self) -> CatalogClient: | ||
| return CatalogClient(address=self.address()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import datafusion | ||
| import pyarrow as pa | ||
| import rerun_draft as rr | ||
| from inline_snapshot import snapshot as inline_snapshot | ||
|
|
||
|
|
||
| def test_table_api() -> None: | ||
| with rr.server.Server() as server: | ||
| client = server.client() | ||
|
|
||
| table = client.create_table( | ||
| "my_table", | ||
| pa.schema([ | ||
| ("rerun_segment_id", pa.string()), | ||
| ("operator", pa.string()), | ||
| ]), | ||
| ) | ||
|
|
||
| assert isinstance(table, datafusion.DataFrame) | ||
|
|
||
| assert str(table.schema()) == inline_snapshot("""\ | ||
| rerun_segment_id: string | ||
| operator: string | ||
| -- schema metadata -- | ||
| sorbet:version: '0.1.1'\ | ||
| """) | ||
|
|
||
| assert str(table.collect()) == inline_snapshot("[]") | ||
|
|
||
| table.append( | ||
| rerun_segment_id=["segment_001", "segment_002", "segment_003"], | ||
| operator=["alice", "bob", "carol"], | ||
| ) | ||
|
|
||
| assert str(table.select("rerun_segment_id", "operator")) == inline_snapshot("""\ | ||
| ┌─────────────────────┬─────────────────────┐ | ||
| │ rerun_segment_id ┆ operator │ | ||
| │ --- ┆ --- │ | ||
| │ type: nullable Utf8 ┆ type: nullable Utf8 │ | ||
| ╞═════════════════════╪═════════════════════╡ | ||
| │ segment_001 ┆ alice │ | ||
| ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ | ||
| │ segment_002 ┆ bob │ | ||
| ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ | ||
| │ segment_003 ┆ carol │ | ||
| └─────────────────────┴─────────────────────┘\ | ||
| """) |
Uh oh!
There was an error while loading. Please reload this page.