diff --git a/CHANGELOG.md b/CHANGELOG.md index 194dcbd9..dcd381a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- new `datacontract import powerbi` command : import PowerBI Semantic Model into an ODCS contract + ### Added - `datacontract test` now logs the Data Contract CLI version and whether it ran as a local CLI or through the FastAPI server (including the request URL) as part of the test result logs diff --git a/README.md b/README.md index b9f1f17d..b587d7e0 100644 --- a/README.md +++ b/README.md @@ -1794,6 +1794,7 @@ For more information about the Excel template structure, visit the [ODCS Excel T │ spark Import a data contract from a Spark schema. │ │ iceberg Import a data contract from an Iceberg schema. │ │ excel Import a data contract from an Excel file. │ +│ powerbi Import a data contract from an PowerBI template file. │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ Example: datacontract import sql --source ddl.sql --dialect postgres --output datacontract.yaml diff --git a/datacontract/command_import.py b/datacontract/command_import.py index af0e6381..a51d64f9 100644 --- a/datacontract/command_import.py +++ b/datacontract/command_import.py @@ -425,3 +425,21 @@ def import_excel( enable_debug_logging(debug) result = DataContract.import_from_source(format="excel", source=source, schema=schema, owner=owner, id=id) _write_result(result, output) + + +@import_app.command( + name="powerbi", + epilog="Example: datacontract import powerbi --source datacontract.pbit --output datacontract.yaml", +) +def import_powerbi( + source: Annotated[Optional[str], typer.Option(help="Path to the Power BI .pbit file.")] = None, + output: output_option = None, + schema: schema_option = None, + owner: owner_option = None, + id: id_option = None, + debug: debug_option = None, +): + """Import a data contract from a Power BI .pbit file.""" + enable_debug_logging(debug) + result = DataContract.import_from_source(format="powerbi", source=source, schema=schema, owner=owner, id=id) + _write_result(result, output) diff --git a/datacontract/imports/importer.py b/datacontract/imports/importer.py index 24961fb8..68d0d190 100644 --- a/datacontract/imports/importer.py +++ b/datacontract/imports/importer.py @@ -38,6 +38,7 @@ class ImportFormat(str, Enum): csv = "csv" protobuf = "protobuf" excel = "excel" + powerbi = "powerbi" @classmethod def get_supported_formats(cls): diff --git a/datacontract/imports/importer_factory.py b/datacontract/imports/importer_factory.py index 6566f18b..93d691ad 100644 --- a/datacontract/imports/importer_factory.py +++ b/datacontract/imports/importer_factory.py @@ -119,7 +119,11 @@ def load_module_class(module_path, class_name): module_path="datacontract.imports.excel_importer", class_name="ExcelImporter", ) - +importer_factory.register_lazy_importer( + name=ImportFormat.powerbi, + module_path="datacontract.imports.powerbi_importer", + class_name="PowerBIImporter", +) importer_factory.register_lazy_importer( name=ImportFormat.json, diff --git a/datacontract/imports/powerbi_importer.py b/datacontract/imports/powerbi_importer.py new file mode 100644 index 00000000..765845ba --- /dev/null +++ b/datacontract/imports/powerbi_importer.py @@ -0,0 +1,478 @@ +from __future__ import annotations + +import json +import zipfile +from pathlib import Path +from typing import Any, Dict, List, Optional + +from open_data_contract_standard.model import ( + CustomProperty, + OpenDataContractStandard, + Relationship, + SchemaObject, + SchemaProperty, +) + +from datacontract.imports.importer import Importer +from datacontract.imports.odcs_helper import create_odcs, create_property, create_schema_object, create_server +from datacontract.model.exceptions import DataContractException + +# --------------------------------------------------------------------------- +# Power BI data type → (ODCS logical type, optional format) +# --------------------------------------------------------------------------- +_PBI_TYPE_MAP: Dict[str, tuple[str, Optional[str]]] = { + "string": ("string", None), + "int64": ("integer", None), + "double": ("number", None), + "decimal": ("number", None), + "boolean": ("boolean", None), + "datetime": ("timestamp", None), + "date": ("date", None), + "time": ("time", None), + "binary": ("string", "binary"), + "duration": ("string", None), + "unknown": ("string", None), + "variant": ("object", None), +} + + +def _map_pbi_type(data_type: Optional[str]) -> tuple[str, Optional[str]]: + """Map a Power BI data type string to ``(logicalType, format)``.""" + if data_type is None: + return ("string", None) + return _PBI_TYPE_MAP.get(data_type.lower(), ("string", None)) + + +def _make_id(name: str) -> str: + """Build a stable ID from a name by replacing spaces with underscores and appending ``_id``. + + Examples: + "Sales" → "Sales_id" + "Order Date" → "Order_Date_id" + """ + return name.replace(" ", "_").replace("(", "_").replace(")", "_").replace("%", "percent") + "_id" + + +# --------------------------------------------------------------------------- +# Importer class +# --------------------------------------------------------------------------- + + +class PowerBIImporter(Importer): + def import_source(self, source: str, import_args: dict) -> OpenDataContractStandard: + if source is None: + raise DataContractException( + type="source", + name="powerbi import source", + reason=( + "Source file path is required for Power BI import. " + "Provide a path to a .pbit file, a .bim file, or a model.bim JSON file." + ), + engine="datacontract", + ) + return import_powerbi_from_file(source_path=source) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def import_powerbi_from_file(source_path: str) -> OpenDataContractStandard: + """Import a Power BI semantic model from a ``.pbit``, ``.bim``, or ``.json`` file.""" + path = Path(source_path) + if not path.exists(): + raise DataContractException( + type="import", + name="powerbi import", + reason=f"File not found: {source_path}", + engine="datacontract", + ) + + suffix = path.suffix.lower() + if suffix == ".pbit": + bim = _load_bim_from_pbit(path) + elif suffix in (".bim", ".json"): + bim = _load_bim_from_json(path) + else: + raise DataContractException( + type="import", + name="powerbi import", + reason=(f"Unsupported file extension '{suffix}'. Supported formats: .pbit, .bim, .json (model.bim)"), + engine="datacontract", + ) + + return _build_odcs(bim, model_name=path.stem) + + +# --------------------------------------------------------------------------- +# BIM loaders +# --------------------------------------------------------------------------- + + +def _load_bim_from_pbit(pbit_path: Path) -> Dict[str, Any]: + """Extract and parse the ``DataModelSchema`` JSON from a .pbit ZIP archive. + + Power BI Desktop embeds the tabular model as a UTF-16 LE JSON file called + ``DataModelSchema`` inside the .pbit ZIP archive. This entry has been + present since mid-2021; older files may not include it. + """ + _ENTRY_NAME = "DataModelSchema" + try: + with zipfile.ZipFile(pbit_path, "r") as zf: + if _ENTRY_NAME not in zf.namelist(): + raise DataContractException( + type="import", + name="powerbi import", + reason=( + f"'DataModelSchema' was not found inside '{pbit_path.name}'. " + "This entry is present in Power BI Desktop files saved since mid-2021. " + "Try re-saving the file from an up-to-date Power BI Desktop, or export " + "the model as a .bim file using pbi-tools (https://pbi.tools) and " + "import that instead." + ), + engine="datacontract", + ) + raw = zf.read(_ENTRY_NAME) + except zipfile.BadZipFile as exc: + raise DataContractException( + type="import", + name="powerbi import", + reason=f"'{pbit_path.name}' is not a valid .pbit / ZIP file: {exc}", + engine="datacontract", + original_exception=exc, + ) + + # DataModelSchema is always UTF-16 LE, optionally with a BOM + text = raw.decode("utf-16-le").lstrip("\ufeff") + try: + return json.loads(text) + except json.JSONDecodeError as exc: + raise DataContractException( + type="import", + name="powerbi import", + reason=f"Failed to parse DataModelSchema JSON: {exc}", + engine="datacontract", + original_exception=exc, + ) + + +def _load_bim_from_json(bim_path: Path) -> Dict[str, Any]: + """Load a ``model.bim`` / ``.json`` file as BIM JSON (UTF-8 with optional BOM).""" + try: + with open(bim_path, encoding="utf-8-sig") as f: + return json.load(f) + except (OSError, json.JSONDecodeError) as exc: + raise DataContractException( + type="import", + name="powerbi import", + reason=f"Failed to read BIM file '{bim_path}': {exc}", + engine="datacontract", + original_exception=exc, + ) + + +# --------------------------------------------------------------------------- +# ODCS builder +# --------------------------------------------------------------------------- + + +def _build_odcs(bim: Dict[str, Any], model_name: str) -> OpenDataContractStandard: + # Some BIM files wrap everything under a top-level "model" key; others don't. + model = bim.get("model", bim) + + odcs = create_odcs( + id=model_name.lower().replace(" ", "-"), + name=model_name, + ) + + odcs.servers = [ + create_server( + name="powerbi", + server_type="custom", + path=model_name, + ) + ] + + tables = model.get("tables", []) + bim_relationships = model.get("relationships", []) + + schema_objects: List[SchemaObject] = [] + # table name → SchemaObject (for relationship wiring) + table_name_to_obj: Dict[str, SchemaObject] = {} + # table name → {col name → SchemaProperty} (for from_/to ID resolution) + table_name_to_col_props: Dict[str, Dict[str, SchemaProperty]] = {} + + for table in tables: + schema_obj = _map_table(table) + if schema_obj is not None: + t_name = table.get("name", "") + schema_objects.append(schema_obj) + table_name_to_obj[t_name] = schema_obj + col_props: Dict[str, SchemaProperty] = {} + if schema_obj.properties: + for prop in schema_obj.properties: + if prop.name: + col_props[prop.name] = prop + table_name_to_col_props[t_name] = col_props + + _apply_bim_relationships(bim_relationships, table_name_to_obj, table_name_to_col_props) + + schema_objects.sort(key=lambda s: s.name.lower()) + odcs.schema_ = schema_objects + return odcs + + +# --------------------------------------------------------------------------- +# Table → SchemaObject +# --------------------------------------------------------------------------- + + +def _map_table(table: Dict[str, Any]) -> Optional[SchemaObject]: + name = table.get("name", "") + if not name: + return None + if name.startswith("LocalDateTable_"): + return None + if name.startswith("DateTableTemplate_"): + return None + + description = table.get("description") or None + is_hidden = table.get("isHidden", False) + physical_type = _table_physical_type(table) + tags = ["hidden"] if is_hidden else None + + properties: List[SchemaProperty] = [] + + # Columns — skip internal rowNumber columns added by the engine + for col in table.get("columns", []): + if col.get("columnType") == "rowNumber": + continue + prop = _map_column(col) + if prop: + properties.append(prop) + + # Measures + for measure in table.get("measures", []): + prop = _map_measure(measure) + if prop: + properties.append(prop) + + # Hierarchies + for hierarchy in table.get("hierarchies", []): + prop = _map_hierarchy(hierarchy) + if prop: + properties.append(prop) + + schema_obj = create_schema_object( + name=name, + physical_type=physical_type, + description=description, + tags=tags, + properties=properties if properties else None, + ) + schema_obj.id = _make_id(name) + + return schema_obj + + +def _table_physical_type(table: Dict[str, Any]) -> str: + partitions = table.get("partitions", []) + if not partitions: + return "table" + source_type = partitions[0].get("source", {}).get("type", "") + return "calculated table" if source_type == "calculated" else "table" + + +# --------------------------------------------------------------------------- +# Column → SchemaProperty +# --------------------------------------------------------------------------- + + +def _map_column(col: Dict[str, Any]) -> Optional[SchemaProperty]: + name = col.get("name", "") + if not name: + return None + + data_type = col.get("dataType", "string") + column_type = col.get("columnType", "data") + is_calculated = column_type == "calculated" + + logical_type, fmt = _map_pbi_type(data_type) + physical_type = "calculated column" if is_calculated else data_type + is_nullable = col.get("isNullable", True) + description = col.get("description") or None + + custom_props: Dict[str, Any] = {} + + if col.get("formatString"): + custom_props["formatString"] = col["formatString"] + if col.get("displayFolder"): + custom_props["displayFolder"] = col["displayFolder"] + if col.get("summarizeBy") and col["summarizeBy"] not in ("none", "default", None): + custom_props["summarizeBy"] = col["summarizeBy"] + if col.get("isHidden"): + custom_props["isHidden"] = True + + expression = col.get("expression") + if isinstance(expression, list): + expression = "".join(expression) + + property = create_property( + name=name, + logical_type=logical_type, + physical_type=physical_type, + description=description, + required=True if not is_nullable else None, + format=fmt, + custom_properties=custom_props if custom_props else None, + ) + + property.id = _make_id(name) + property.transformLogic = expression.strip() if expression else None + + return property + + +# --------------------------------------------------------------------------- +# Measure → SchemaProperty +# --------------------------------------------------------------------------- + + +def _map_measure(measure: Dict[str, Any]) -> Optional[SchemaProperty]: + name = measure.get("name", "") + if not name: + return None + + description = measure.get("description") or None + expression = measure.get("expression", "") + # Power BI sometimes stores multi-line DAX as a list of strings + if isinstance(expression, list): + expression = "\n".join(expression) + + custom_props: Dict[str, Any] = {} + if measure.get("isHidden"): + custom_props["isHidden"] = True + if measure.get("displayFolder"): + custom_props["displayFolder"] = measure["displayFolder"] + + logical_type = _infer_measure_type(measure.get("formatString", ""), expression) + + property = create_property( + name=name, + logical_type=logical_type, + physical_type="measure", + description=description, + custom_properties=custom_props if custom_props else None, + ) + property.id = _make_id(name) + property.transformLogic = expression.strip() if expression else None + + return property + + +def _infer_measure_type(format_string: str, expression: str) -> str: + """Best-effort inference of a measure's return type from its format string.""" + if not format_string: + return "number" + fs = format_string.lower() + if any(k in fs for k in ["yyyy", "mmm", "ddd", "hh:mm"]): + return "timestamp" + if "true" in fs or "false" in fs: + return "boolean" + # Numeric format patterns: digits, comma separators, currency, percentage + if any(c in fs for c in ["#", "0", "%", "$", "€", "£"]): + return "number" + return "number" + + +# --------------------------------------------------------------------------- +# Hierarchy → SchemaProperty +# --------------------------------------------------------------------------- + + +def _map_hierarchy(hierarchy: Dict[str, Any]) -> Optional[SchemaProperty]: + name = hierarchy.get("name", "") + if not name: + return None + + description = hierarchy.get("description") or None + levels = sorted(hierarchy.get("levels", []), key=lambda lv: lv.get("ordinal", 0)) + + level_props = [ + create_property( + name=lv["name"], + logical_type="string", + physical_type="hierarchy level", + custom_properties={"columnRef": lv["column"]} if lv.get("column") else None, + ) + for lv in levels + if lv.get("name") + ] + + return create_property( + name=name, + logical_type="object", + physical_type="hierarchy", + description=description, + properties=level_props if level_props else None, + ) + + +# --------------------------------------------------------------------------- +# Relationships +# --------------------------------------------------------------------------- + + +def _apply_bim_relationships( + bim_relationships: List[Dict[str, Any]], + table_name_to_obj: Dict[str, SchemaObject], + table_name_to_col_props: Dict[str, Dict[str, SchemaProperty]], +) -> None: + """Attach BIM relationships as ODCS Relationship objects on the 'from' SchemaObject. + + ``from_`` and ``to`` are formatted as ``schemaId.propertyId``, using the + name-based ID (``{Name}_id``) for both tables and columns. + The relationship is placed on the *from* side (many side) of the join. + """ + for rel in bim_relationships: + from_table = rel.get("fromTable", "") + from_col_name = rel.get("fromColumn", "") + to_table = rel.get("toTable", "") + to_col_name = rel.get("toColumn", "") + + if from_table.startswith("LocalDateTable_") or to_table.startswith("DateTableTemplate_"): + continue + + from_obj = table_name_to_obj.get(from_table) + to_obj = table_name_to_obj.get(to_table) + if from_obj is None or to_obj is None: + continue + + from_schema_id = from_obj.name or from_table.name + to_schema_id = to_obj.name or to_table + + from_prop = table_name_to_col_props.get(from_table, {}).get(from_col_name) + from_prop_id = (from_prop.name if from_prop and from_prop.name else None) or from_col_name + + to_prop = table_name_to_col_props.get(to_table, {}).get(to_col_name) + to_prop_id = (to_prop.name if to_prop and to_prop.name else None) or to_col_name + + from_card = rel.get("fromCardinality", "many") + to_card = rel.get("toCardinality", "one") + is_active = rel.get("isActive", True) + + custom_props = [CustomProperty(property="cardinality", value=f"{from_card}-to-{to_card}")] + if not is_active: + custom_props.append(CustomProperty(property="active", value=False)) + + odcs_rel = Relationship( + type="foreignKey", + **{"from": f"{from_schema_id}.{from_prop_id}"}, + to=f"{to_schema_id}.{to_prop_id}", + customProperties=custom_props, + ) + + if from_obj.relationships is None: + from_obj.relationships = [] + from_obj.relationships.append(odcs_rel) diff --git a/tests/fixtures/powerbi/model.bim b/tests/fixtures/powerbi/model.bim new file mode 100644 index 00000000..a9d101f6 --- /dev/null +++ b/tests/fixtures/powerbi/model.bim @@ -0,0 +1,245 @@ +{ + "name": "SemanticModel", + "compatibilityLevel": 1550, + "model": { + "culture": "en-US", + "tables": [ + { + "name": "Sales", + "description": "Transactional sales data", + "columns": [ + { + "name": "OrderID", + "dataType": "int64", + "isNullable": false, + "description": "Unique order identifier", + "columnType": "data" + }, + { + "name": "OrderDate", + "dataType": "dateTime", + "isNullable": true, + "description": "Date the order was placed", + "columnType": "data", + "formatString": "yyyy-MM-dd" + }, + { + "name": "CustomerKey", + "dataType": "int64", + "isNullable": false, + "columnType": "data" + }, + { + "name": "ProductKey", + "dataType": "int64", + "isNullable": false, + "columnType": "data" + }, + { + "name": "SalesAmount", + "dataType": "decimal", + "isNullable": true, + "description": "Net sales amount in USD", + "columnType": "data", + "summarizeBy": "sum", + "formatString": "#,##0.00" + }, + { + "name": "Quantity", + "dataType": "int64", + "isNullable": true, + "columnType": "data", + "summarizeBy": "sum" + }, + { + "name": "UnitPrice", + "dataType": "decimal", + "isNullable": true, + "columnType": "data", + "summarizeBy": "none" + }, + { + "name": "Discount", + "dataType": "double", + "isNullable": true, + "columnType": "data", + "summarizeBy": "none", + "isHidden": true + }, + { + "name": "GrossMargin", + "dataType": "decimal", + "isNullable": true, + "description": "Gross margin calculated from sales and cost", + "columnType": "calculated", + "expression": "Sales[SalesAmount] - Sales[CostAmount]", + "formatString": "#,##0.00", + "displayFolder": "Financials" + }, + { + "name": "RowNumber", + "columnType": "rowNumber", + "dataType": "int64", + "isHidden": true + } + ], + "measures": [ + { + "name": "Total Sales", + "description": "Sum of all sales amounts", + "expression": "SUM(Sales[SalesAmount])", + "formatString": "#,##0.00", + "displayFolder": "KPIs" + }, + { + "name": "Total Quantity", + "description": "Total units sold", + "expression": "SUM(Sales[Quantity])", + "formatString": "#,##0", + "displayFolder": "KPIs" + }, + { + "name": "Average Order Value", + "description": "Average revenue per order", + "expression": "DIVIDE([Total Sales], DISTINCTCOUNT(Sales[OrderID]), 0)", + "formatString": "#,##0.00", + "displayFolder": "KPIs" + }, + { + "name": "YTD Sales", + "description": "Year-to-date sales using time intelligence", + "expression": "TOTALYTD([Total Sales], 'Date'[Date])", + "formatString": "#,##0.00", + "displayFolder": "Time Intelligence" + }, + { + "name": "Sales MoM %", + "description": "Month-over-month sales growth rate", + "expression": "VAR _prev = CALCULATE([Total Sales], DATEADD('Date'[Date], -1, MONTH))\nRETURN DIVIDE([Total Sales] - _prev, _prev, BLANK())", + "formatString": "0.00%", + "displayFolder": "Time Intelligence" + }, + { + "name": "Is Top Customer Flag", + "description": "Flag indicating whether the current filter context is a top-10 customer", + "expression": "IF(RANKX(ALL(Sales[CustomerKey]), [Total Sales]) <= 10, TRUE(), FALSE())", + "formatString": "\"True\";\"True\";\"False\"", + "isHidden": true + } + ], + "partitions": [ + { + "name": "Sales-Partition", + "source": { + "type": "m", + "expression": "let\n Source = Sql.Database(\"server\", \"database\")\nin\n Source" + } + } + ] + }, + { + "name": "Date", + "description": "Calendar date dimension", + "columns": [ + { + "name": "Date", + "dataType": "dateTime", + "isNullable": false, + "description": "Full date value", + "columnType": "data", + "formatString": "yyyy-MM-dd" + }, + { + "name": "Year", + "dataType": "int64", + "isNullable": false, + "description": "Calendar year", + "columnType": "data" + }, + { + "name": "Quarter", + "dataType": "string", + "isNullable": false, + "description": "Calendar quarter label (e.g. Q1)", + "columnType": "data" + }, + { + "name": "Month", + "dataType": "int64", + "isNullable": false, + "description": "Calendar month number (1-12)", + "columnType": "data" + }, + { + "name": "MonthName", + "dataType": "string", + "isNullable": false, + "description": "Full month name", + "columnType": "data" + }, + { + "name": "IsWeekend", + "dataType": "boolean", + "isNullable": false, + "columnType": "data" + } + ], + "hierarchies": [ + { + "name": "Calendar", + "description": "Standard calendar drill-down hierarchy", + "levels": [ + {"ordinal": 0, "name": "Year", "column": "Year"}, + {"ordinal": 1, "name": "Quarter", "column": "Quarter"}, + {"ordinal": 2, "name": "Month", "column": "MonthName"}, + {"ordinal": 3, "name": "Date", "column": "Date"} + ] + } + ], + "partitions": [ + { + "name": "Date-Partition", + "source": { + "type": "calculated", + "expression": "CALENDAR(DATE(2020,1,1), DATE(2030,12,31))" + } + } + ] + }, + { + "name": "DateTableTemplate", + "isHidden": true, + "columns": [ + { + "name": "Date", + "dataType": "dateTime", + "isNullable": false, + "columnType": "data" + } + ], + "partitions": [ + { + "name": "DateTableTemplate-Partition", + "source": { + "type": "calculated", + "expression": "CALENDAR(DATE(2020,1,1), DATE(2020,12,31))" + } + } + ] + } + ], + "relationships": [ + { + "name": "Sales_Date", + "fromTable": "Sales", + "fromColumn": "OrderDate", + "toTable": "Date", + "toColumn": "Date", + "fromCardinality": "many", + "toCardinality": "one", + "isActive": true + } + ], + "roles": [] + } +} diff --git a/tests/test_import_powerbi.py b/tests/test_import_powerbi.py new file mode 100644 index 00000000..1636c90b --- /dev/null +++ b/tests/test_import_powerbi.py @@ -0,0 +1,464 @@ +"""Tests for the Power BI .pbit / .bim importer.""" + +import io +import json +import zipfile + +import pytest +import yaml + +from datacontract.imports.powerbi_importer import import_powerbi_from_file + +BIM_FIXTURE = "fixtures/powerbi/model.bim" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_pbit(bim_dict: dict, encoding: str = "utf-16-le") -> bytes: + """Create an in-memory .pbit (ZIP) containing a DataModelSchema entry.""" + raw = json.dumps(bim_dict).encode(encoding) + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("DataModelSchema", raw) + return buf.getvalue() + + +def _write_pbit(tmp_path, bim_dict: dict) -> str: + path = tmp_path / "model.pbit" + path.write_bytes(_make_pbit(bim_dict)) + return str(path) + + +# --------------------------------------------------------------------------- +# BIM file import +# --------------------------------------------------------------------------- + + +def test_import_bim_returns_odcs(): + result = import_powerbi_from_file(BIM_FIXTURE) + assert result is not None + assert result.kind == "DataContract" + assert result.apiVersion == "v3.1.0" + + +def test_import_bim_model_name(): + result = import_powerbi_from_file(BIM_FIXTURE) + assert result.name == "model" + assert result.id == "model" + + +def test_import_bim_server(): + result = import_powerbi_from_file(BIM_FIXTURE) + assert len(result.servers) == 1 + server = result.servers[0] + assert server.type == "custom" + assert server.server == "powerbi" + + +def test_import_bim_tables_sorted(): + result = import_powerbi_from_file(BIM_FIXTURE) + names = [s.name for s in result.schema_] + assert names == sorted(names, key=str.lower) + + +def test_import_bim_expected_tables(): + result = import_powerbi_from_file(BIM_FIXTURE) + names = {s.name for s in result.schema_} + # DateTableTemplate is hidden but should still be imported with hidden tag + assert "Sales" in names + assert "Date" in names + assert "DateTableTemplate" in names + + +def test_import_bim_table_description(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + assert sales.description == "Transactional sales data" + + +def test_import_bim_hidden_table_tagged(): + result = import_powerbi_from_file(BIM_FIXTURE) + hidden = next(s for s in result.schema_ if s.name == "DateTableTemplate") + assert hidden.tags is not None + assert "hidden" in hidden.tags + + +def test_import_bim_local_data_table_excluded(tmp_path): + """Tables whose name starts with LocalDateTable_ must be skipped entirely.""" + import json + + with open(BIM_FIXTURE, encoding="utf-8-sig") as f: + bim_dict = json.load(f) + + bim_dict["model"]["tables"].append( + { + "name": "LocalDateTable_abc123", + "columns": [{"name": "Col1", "dataType": "string", "columnType": "data"}], + "partitions": [{"name": "p", "source": {"type": "m", "expression": ""}}], + } + ) + + bim_path = tmp_path / "model.bim" + bim_path.write_text(json.dumps(bim_dict), encoding="utf-8") + result = import_powerbi_from_file(str(bim_path)) + + names = {s.name for s in result.schema_} + assert "LocalDateTable_abc123" not in names + + +def test_import_bim_local_data_table_relationship_excluded(tmp_path): + """Relationships referencing a LocalDateTable_ table must be skipped.""" + import json + + with open(BIM_FIXTURE, encoding="utf-8-sig") as f: + bim_dict = json.load(f) + + bim_dict["model"]["tables"].append( + { + "name": "LocalDateTable_xyz", + "columns": [{"name": "Key", "dataType": "int64", "columnType": "data"}], + "partitions": [{"name": "p", "source": {"type": "m", "expression": ""}}], + } + ) + bim_dict["model"]["relationships"].append( + { + "name": "Sales_LocalDateTable", + "fromTable": "LocalDateTable_xyz", + "fromColumn": "Key", + "toTable": "Sales", + "toColumn": "OrderID", + "fromCardinality": "many", + "toCardinality": "one", + } + ) + + bim_path = tmp_path / "model.bim" + bim_path.write_text(json.dumps(bim_dict), encoding="utf-8") + result = import_powerbi_from_file(str(bim_path)) + + sales = next(s for s in result.schema_ if s.name == "Sales") + rel_sources = [r.from_ for r in (sales.relationships or [])] + assert not any("LocalDateTable_xyz" in s for s in rel_sources) + + +# --------------------------------------------------------------------------- +# Columns +# --------------------------------------------------------------------------- + + +def test_import_bim_rowNumber_column_excluded(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + col_names = [p.name for p in sales.properties] + assert "RowNumber" not in col_names + + +def test_import_bim_column_types(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + props = {p.name: p for p in sales.properties} + + assert props["OrderID"].logicalType == "integer" + assert props["OrderDate"].logicalType == "timestamp" + assert props["SalesAmount"].logicalType == "number" + assert props["Discount"].logicalType == "number" + + +def test_import_bim_required_column(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + order_id = next(p for p in sales.properties if p.name == "OrderID") + assert order_id.required is True + + +def test_import_bim_nullable_column_not_required(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + amount = next(p for p in sales.properties if p.name == "SalesAmount") + assert amount.required is None + + +def test_import_bim_column_description(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + order_id = next(p for p in sales.properties if p.name == "OrderID") + assert order_id.description == "Unique order identifier" + + +def test_import_bim_hidden_column_custom_property(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + discount = next(p for p in sales.properties if p.name == "Discount") + cp_keys = {cp.property for cp in (discount.customProperties or [])} + assert "isHidden" in cp_keys + + +# --------------------------------------------------------------------------- +# Calculated columns +# --------------------------------------------------------------------------- + + +def test_import_bim_calculated_column_physical_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + margin = next(p for p in sales.properties if p.name == "GrossMargin") + assert margin.physicalType == "calculated column" + + +def test_import_bim_calculated_column_dax_expression(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + margin = next(p for p in sales.properties if p.name == "GrossMargin") + assert margin.transformLogic is not None + assert "SalesAmount" in margin.transformLogic + + +# --------------------------------------------------------------------------- +# Measures +# --------------------------------------------------------------------------- + + +def test_import_bim_measures_present(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + measure_names = {p.name for p in sales.properties if p.physicalType == "measure"} + assert "Total Sales" in measure_names + assert "YTD Sales" in measure_names + assert "Sales MoM %" in measure_names + + +def test_import_bim_measure_physical_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + total_sales = next(p for p in sales.properties if p.name == "Total Sales") + assert total_sales.physicalType == "measure" + + +def test_import_bim_measure_dax_expression(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + total_sales = next(p for p in sales.properties if p.name == "Total Sales") + assert total_sales.transformLogic is not None + assert total_sales.transformLogic == "SUM(Sales[SalesAmount])" + + +def test_import_bim_measure_description(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + total_sales = next(p for p in sales.properties if p.name == "Total Sales") + assert total_sales.description == "Sum of all sales amounts" + + +def test_import_bim_measure_display_folder(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + ytd = next(p for p in sales.properties if p.name == "YTD Sales") + folder_cp = next((cp for cp in (ytd.customProperties or []) if cp.property == "displayFolder"), None) + assert folder_cp is not None + assert folder_cp.value == "Time Intelligence" + + +def test_import_bim_measure_multiline_dax(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + mom = next(p for p in sales.properties if p.name == "Sales MoM %") + assert mom.transformLogic is not None + assert "DATEADD" in mom.transformLogic + + +def test_import_bim_measure_inferred_number_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + total_sales = next(p for p in sales.properties if p.name == "Total Sales") + assert total_sales.logicalType == "number" + + +def test_import_bim_hidden_measure_custom_property(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + flag = next(p for p in sales.properties if p.name == "Is Top Customer Flag") + cp_keys = {cp.property for cp in (flag.customProperties or [])} + assert "isHidden" in cp_keys + + +# --------------------------------------------------------------------------- +# Hierarchies +# --------------------------------------------------------------------------- + + +def test_import_bim_hierarchy_present(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + hier = next((p for p in date_table.properties if p.name == "Calendar"), None) + assert hier is not None + + +def test_import_bim_hierarchy_physical_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + hier = next(p for p in date_table.properties if p.name == "Calendar") + assert hier.physicalType == "hierarchy" + assert hier.logicalType == "object" + + +def test_import_bim_hierarchy_levels(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + hier = next(p for p in date_table.properties if p.name == "Calendar") + level_names = [lv.name for lv in (hier.properties or [])] + assert level_names == ["Year", "Quarter", "Month", "Date"] + + +def test_import_bim_hierarchy_level_column_ref(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + hier = next(p for p in date_table.properties if p.name == "Calendar") + year_level = next(lv for lv in hier.properties if lv.name == "Year") + col_ref_cp = next((cp for cp in (year_level.customProperties or []) if cp.property == "columnRef"), None) + assert col_ref_cp is not None + assert col_ref_cp.value == "Year" + + +def test_import_bim_hierarchy_description(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + hier = next(p for p in date_table.properties if p.name == "Calendar") + assert hier.description == "Standard calendar drill-down hierarchy" + + +# --------------------------------------------------------------------------- +# Relationships +# --------------------------------------------------------------------------- + + +def test_import_bim_relationship_on_sales_table(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + assert sales.relationships is not None + assert len(sales.relationships) == 1 + rel = sales.relationships[0] + assert rel.type == "foreignKey" + assert "OrderDate" in rel.from_ + assert "Date" in rel.to + + +def test_import_bim_no_relationship_on_date_table(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + assert not date_table.relationships + + +# --------------------------------------------------------------------------- +# IDs +# --------------------------------------------------------------------------- + + +def test_import_bim_table_id(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + assert sales.id == "Sales_id" + + +def test_import_bim_column_id(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + order_id = next(p for p in sales.properties if p.name == "OrderID") + assert order_id.id == "OrderID_id" + + +def test_import_bim_measure_id(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + total_sales = next(p for p in sales.properties if p.name == "Total Sales") + assert total_sales.id == "Total_Sales_id" + + +def test_import_bim_relationship_ids_use_name_convention(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + rel = sales.relationships[0] + assert rel.from_ == "Sales.OrderDate" + assert rel.to == "Date.Date" + + +# --------------------------------------------------------------------------- +# Physical types +# --------------------------------------------------------------------------- + + +def test_import_bim_table_physical_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + sales = next(s for s in result.schema_ if s.name == "Sales") + assert sales.physicalType == "table" + + +def test_import_bim_calculated_table_physical_type(): + result = import_powerbi_from_file(BIM_FIXTURE) + date_table = next(s for s in result.schema_ if s.name == "Date") + assert date_table.physicalType == "calculated table" + + +# --------------------------------------------------------------------------- +# .pbit (ZIP) loading +# --------------------------------------------------------------------------- + + +def test_import_pbit_from_zip(tmp_path): + with open(BIM_FIXTURE, encoding="utf-8-sig") as f: + bim_dict = json.load(f) + + pbit_path = _write_pbit(tmp_path, bim_dict) + result = import_powerbi_from_file(pbit_path) + + assert result is not None + names = {s.name for s in result.schema_} + assert "Sales" in names + assert "Date" in names + + +def test_import_pbit_missing_data_model_schema(tmp_path): + """A .pbit without DataModelSchema should raise a clear DataContractException.""" + from datacontract.model.exceptions import DataContractException + + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("Report/Layout", "{}") + pbit_path = tmp_path / "no_schema.pbit" + pbit_path.write_bytes(buf.getvalue()) + + with pytest.raises(DataContractException, match="DataModelSchema"): + import_powerbi_from_file(str(pbit_path)) + + +def test_import_bad_extension_raises(tmp_path): + from datacontract.model.exceptions import DataContractException + + bad_file = tmp_path / "model.csv" + bad_file.write_text("col1,col2") + + with pytest.raises(DataContractException, match="Unsupported file extension"): + import_powerbi_from_file(str(bad_file)) + + +def test_import_missing_file_raises(): + from datacontract.model.exceptions import DataContractException + + with pytest.raises(DataContractException, match="File not found"): + import_powerbi_from_file("nonexistent/path/model.bim") + + +# --------------------------------------------------------------------------- +# YAML round-trip +# --------------------------------------------------------------------------- + + +def test_import_bim_yaml_is_valid(): + result = import_powerbi_from_file(BIM_FIXTURE) + yaml_str = result.to_yaml() + parsed = yaml.safe_load(yaml_str) + assert parsed["kind"] == "DataContract" + assert parsed["apiVersion"] == "v3.1.0" + assert any(s["name"] == "Sales" for s in parsed["schema"])