From ed4963fafb4f3a945c4daa8948a50fcd1a8b1daa Mon Sep 17 00:00:00 2001 From: Pedro Castro Date: Sun, 5 Apr 2026 20:41:01 -0300 Subject: [PATCH] initial --- python-package/basedosdados/core/base.py | 88 +++++----- .../basedosdados/download/download.py | 13 +- .../basedosdados/upload/connection.py | 7 +- python-package/basedosdados/upload/dataset.py | 86 +++++----- .../basedosdados/upload/datatypes.py | 10 +- python-package/basedosdados/upload/storage.py | 151 ++++++++++-------- python-package/basedosdados/upload/table.py | 146 ++++++++--------- python-package/tests/test_dataset.py | 6 +- python-package/tests/test_storage.py | 31 ++-- python-package/tests/test_table.py | 9 +- 10 files changed, 274 insertions(+), 273 deletions(-) diff --git a/python-package/basedosdados/core/base.py b/python-package/basedosdados/core/base.py index 34839c654..a85a0fa6b 100644 --- a/python-package/basedosdados/core/base.py +++ b/python-package/basedosdados/core/base.py @@ -7,10 +7,10 @@ import shutil import sys import warnings -from functools import lru_cache +from functools import cached_property from os import getenv from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Literal, Optional, TypedDict, Union import googleapiclient.discovery import tomlkit @@ -24,6 +24,14 @@ warnings.filterwarnings("ignore") +class Client(TypedDict): + bigquery_prod: bigquery.Client + bigquery_connection_prod: bigquery_connection_v1.ConnectionServiceClient + bigquery_staging: bigquery.Client + bigquery_connection_staging: bigquery_connection_v1.ConnectionServiceClient + storage_staging: storage.Client + + class Base: """ Base class for all datasets @@ -35,7 +43,7 @@ def __init__( bucket_name=None, billing_project_id=None, overwrite_cli_config=False, - folder="staging", + mode="staging", ): """ Initialize the class @@ -52,12 +60,14 @@ def __init__( self.config = self._load_config() self._config_log(config.verbose) self.bucket_name = bucket_name or self.config["bucket_name"] - self.folder = folder + self.mode = mode self.billing_project_id = ( billing_project_id or self.config["gcloud-projects"]["staging"]["name"] ) - self.uri = f"gs://{self.bucket_name}/{folder}" + "/{dataset}/{table}/*" + self.uri = ( + f"gs://{self.bucket_name}/{self.mode}" + "/{dataset}/{table}/*" + ) self._backend = Backend(self.config.get("api", {}).get("url", None)) @property @@ -103,33 +113,32 @@ def _load_credentials(self, mode: str): ], ) - @property - @lru_cache(256) - def client(self): + @cached_property + def client(self) -> Client: """ Client for BigQuery """ - return dict( - bigquery_prod=bigquery.Client( + return { + "bigquery_prod": bigquery.Client( credentials=self._load_credentials("prod"), project=self.config["gcloud-projects"]["prod"]["name"], ), - bigquery_connection_prod=bigquery_connection_v1.ConnectionServiceClient( + "bigquery_connection_prod": bigquery_connection_v1.ConnectionServiceClient( credentials=self._load_credentials("prod") ), - bigquery_staging=bigquery.Client( + "bigquery_staging": bigquery.Client( credentials=self._load_credentials("staging"), project=self.config["gcloud-projects"]["staging"]["name"], ), - bigquery_connection_staging=bigquery_connection_v1.ConnectionServiceClient( + "bigquery_connection_staging": bigquery_connection_v1.ConnectionServiceClient( credentials=self._load_credentials("staging") ), - storage_staging=storage.Client( + "storage_staging": storage.Client( credentials=self._load_credentials("staging"), project=self.config["gcloud-projects"]["staging"]["name"], ), - ) + } @staticmethod def _input_validator(context, default="", with_lower=True): @@ -379,34 +388,31 @@ def _load_config(self): ) @staticmethod - def _check_folder(folder: str = "staging"): + def _check_mode(mode: str) -> Optional[Literal[True]]: """ - Checks if the folder is valid + Checks if the mode is valid """ - if folder is not None and isinstance(folder, str): - return + if isinstance(mode, str) and len(mode.strip()) > 0: + return True - raise Exception( - "This folder does not accept values ​​equal to None and different from string." - "We recommend the following names for the folder:" - "'staging', 'raw', 'header', 'auxiliary_files', 'architecture' or organization_name" - ) + msg = f"Mode {mode} is not supported. We recommend the following names for the folder: 'staging', 'raw', 'header', 'auxiliary_files', 'architecture' or organization_name" + raise Exception(msg) - def _get_project_id(self, project_gcp: str) -> str: + def _get_project_id(self, mode: str) -> str: """ Get the project ID. """ - return self.config["gcloud-projects"][project_gcp]["name"] + return self.config["gcloud-projects"][mode]["name"] - def _get_project_number(self, project_gcp: str) -> str: + def _get_project_number(self, mode: str) -> str: """ Get the project number from project ID. """ - credentials = self._load_credentials(project_gcp) + credentials = self._load_credentials(mode) crm_service = googleapiclient.discovery.build( "cloudresourcemanager", "v1", credentials=credentials ) - project_id = self._get_project_id(project_gcp) + project_id = self._get_project_id(mode) return ( crm_service.projects() @@ -415,19 +421,19 @@ def _get_project_number(self, project_gcp: str) -> str: ) def _get_project_iam_policy( - self, project_gcp: str + self, mode: str ) -> Dict[str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]]: """ Get the project IAM policy. """ - credentials = self._load_credentials(project_gcp) + credentials = self._load_credentials(mode) service = googleapiclient.discovery.build( "cloudresourcemanager", "v1", credentials=credentials ) policy = ( service.projects() .getIamPolicy( - resource=self._get_project_id(project_gcp), + resource=self._get_project_id(mode), body={"options": {"requestedPolicyVersion": 1}}, ) .execute() @@ -439,24 +445,24 @@ def _set_project_iam_policy( policy: Dict[ str, Union[str, int, List[Dict[str, Union[str, List[str]]]]] ], - project_gcp: str, + mode: str, ): """ Set the project IAM policy. """ - credentials = self._load_credentials(project_gcp) + credentials = self._load_credentials(mode) service = googleapiclient.discovery.build( "cloudresourcemanager", "v1", credentials=credentials ) service.projects().setIamPolicy( - resource=self._get_project_id(project_gcp), body={"policy": policy} + resource=self._get_project_id(mode), body={"policy": policy} ).execute() - def _grant_role(self, role: str, member: str, project_gcp: str): + def _grant_role(self, role: str, member: str, mode: str): """ Grant a role to a member. """ - policy = self._get_project_iam_policy(project_gcp) + policy = self._get_project_iam_policy(mode) try: binding = next(b for b in policy["bindings"] if b["role"] == role) except StopIteration: @@ -464,13 +470,13 @@ def _grant_role(self, role: str, member: str, project_gcp: str): policy["bindings"].append(binding) if member not in binding["members"]: binding["members"].append(member) - self._set_project_iam_policy(policy, project_gcp) + self._set_project_iam_policy(policy, mode) - def _revoke_role(self, role: str, member: str, project_gcp: str): + def _revoke_role(self, role: str, member: str, mode: str): """ Revoke a role from a member. """ - policy = self._get_project_iam_policy(project_gcp) + policy = self._get_project_iam_policy(mode) try: binding = next(b for b in policy["bindings"] if b["role"] == role) except StopIteration: @@ -478,4 +484,4 @@ def _revoke_role(self, role: str, member: str, project_gcp: str): else: if member in binding["members"]: binding["members"].remove(member) - self._set_project_iam_policy(policy, project_gcp) + self._set_project_iam_policy(policy, mode) diff --git a/python-package/basedosdados/download/download.py b/python-package/basedosdados/download/download.py index 2bbef41b5..722fd2d84 100644 --- a/python-package/basedosdados/download/download.py +++ b/python-package/basedosdados/download/download.py @@ -370,7 +370,6 @@ def _download_blob_from_bucket( client: _GoogleClient, bucket_name: str, savepath: Path, - user_project: str = "basedosdados-dev", ) -> None: """ Download a blob from a bucket to the path specified. @@ -383,7 +382,9 @@ def _download_blob_from_bucket( Returns: None """ - bucket = client["storage"].bucket(bucket_name, user_project=user_project) + bucket = client["storage"].bucket( + bucket_name, user_project=client["storage"].project + ) for blob in bucket.list_blobs(): filepath = savepath / (blob.name.split("-")[-1] + ".csv.gz") blob.download_to_filename(filepath) @@ -392,7 +393,6 @@ def _download_blob_from_bucket( def _create_bucket( client: _GoogleClient, bucket_name: str, - user_project: str = "basedosdados-dev", ) -> None: """ Create a new bucket in a specific location with standard storage class. @@ -405,21 +405,22 @@ def _create_bucket( None """ storage_client = client["storage"] - bucket = storage_client.bucket(bucket_name, user_project=user_project) + bucket = storage_client.bucket( + bucket_name, user_project=client["storage"].project + ) # standard storage class are adequate for data # stored for only brief periods of time bucket.storage_class = "STANDARD" storage_client.create_bucket( - bucket, location="US", user_project=user_project + bucket, location="US", user_project=client["storage"].project ) def _delete_bucket( client: _GoogleClient, bucket_name: str, - user_project: str = "basedosdados-dev", ) -> None: """Forceably deletes a bucket. diff --git a/python-package/basedosdados/upload/connection.py b/python-package/basedosdados/upload/connection.py index 5a6a92cff..11b0ea264 100644 --- a/python-package/basedosdados/upload/connection.py +++ b/python-package/basedosdados/upload/connection.py @@ -27,7 +27,7 @@ def __init__( self, name: str, location: str = None, - project_gcp: str = "staging", + mode: str = "staging", friendly_name: str = None, description: str = None, **kwargs, @@ -35,9 +35,10 @@ def __init__( super().__init__(**kwargs) self._name = name self._location = location or "US" + self._mode = mode self._friendly_name = friendly_name self._description = description - self._project = self.config["gcloud-projects"][project_gcp]["name"] + self._project = self.config["gcloud-projects"][self._mode]["name"] self._parent = f"projects/{self._project}/locations/{self._location}" @property @@ -54,7 +55,7 @@ def connection(self) -> Union[BQConnection, None]: """ Returns connection object. """ - client = self.client[f"bigquery_connection_{self._project}"] + client = self.client[f"bigquery_connection_{self._mode}"] request = GetConnectionRequest( name=f"{self._parent}/connections/{self._name}" ) diff --git a/python-package/basedosdados/upload/dataset.py b/python-package/basedosdados/upload/dataset.py index ce4625ba6..13b4e3fc1 100644 --- a/python-package/basedosdados/upload/dataset.py +++ b/python-package/basedosdados/upload/dataset.py @@ -36,7 +36,7 @@ def dataset_config(self) -> dict[str, Any]: """ return self.backend.get_dataset_config(self.dataset_id) - def _loop_modes(self, project_gcp: str = "all"): + def _loop_modes(self, mode: str = "all"): """ Loop modes. """ @@ -44,30 +44,28 @@ def _loop_modes(self, project_gcp: str = "all"): def dataset_tag(m): return f"_{m}" if m == "staging" else "" - _project_gcp = ( - ["prod", "staging"] if project_gcp == "all" else [project_gcp] - ) + mode_ = ["prod", "staging"] if mode == "all" else [mode] return ( { "client": self.client[f"bigquery_{m}"], "id": f"{self.client[f'bigquery_{m}'].project}.{self.dataset_id}{dataset_tag(m)}", "mode": m, } - for m in _project_gcp + for m in mode_ ) def _setup_dataset_object( self, dataset_id: str, location: Optional[str] = None, - project_gcp: str = "staging", + mode: str = "staging", ) -> bigquery.Dataset: """ Setup dataset object. """ dataset = bigquery.Dataset(dataset_id) - if project_gcp == "staging": + if mode == "staging": dataset_path = dataset_id.replace("_staging", "") description = f"staging dataset for `{dataset_path}`" labels = {"staging": True} @@ -91,21 +89,20 @@ def _setup_dataset_object( return dataset def publicize( - self, project_gcp: str = "staging", dataset_is_public: bool = True + self, mode: str = "all", dataset_is_public: bool = True ) -> None: """ Changes IAM configuration to turn BigQuery dataset public. Args: - project_gcp: Which dataset to create [`prod`|`staging`]. + mode: Which dataset to create [`prod`|`staging`|`all`]. dataset_is_public: Control if prod dataset is public or not. By default, staging datasets like `dataset_id_staging` are not public. """ - for m in self._loop_modes(project_gcp): + for m in self._loop_modes(mode): dataset = m["client"].get_dataset(m["id"]) - entries = dataset.access_entries # TODO https://github.com/basedosdados/sdk/pull/1020 # TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id: @@ -143,33 +140,29 @@ def publicize( dataset.access_entries = entries m["client"].update_dataset(dataset, ["access_entries"]) logger.success( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.dataset_id, - project_gcp=m["mode"], + mode=m["mode"], object="Dataset", action="publicized", ) - def exists(self, project_gcp: str = "staging") -> bool: + def exists(self, mode: str = "staging") -> bool: """ Check if dataset exists. """ ref_dataset_id = ( - self.dataset_id - if project_gcp == "prod" - else self.dataset_id + "_staging" + self.dataset_id if mode == "prod" else self.dataset_id + "_staging" ) try: - ref = self.client[f"bigquery_{project_gcp}"].get_dataset( - ref_dataset_id - ) + ref = self.client[f"bigquery_{mode}"].get_dataset(ref_dataset_id) except Exception: ref = None return bool(ref) def create( self, - project_gcp: str = "staging", + mode: str = "all", if_exists: str = "raise", dataset_is_public: bool = True, location: Optional[str] = None, @@ -179,11 +172,13 @@ def create( It can create two datasets: - * `` (project_gcp = `prod`) - * `_staging` (project_gcp = `staging`) + * `` (mode = `prod`) + * `_staging` (mode = `staging`) + + If `mode` is `all`, it creates both. Args: - project_gcp: Which dataset to create [`prod`|`staging`]. + mode: Which dataset to create [`prod`|`staging`|`all`]. if_exists: What to do if dataset exists * `raise`: Raises Conflict exception * `replace`: Drop all tables and replace dataset @@ -200,41 +195,36 @@ def create( """ # Set dataset_id to the ID of the dataset to create. - for m in self._loop_modes(project_gcp): + for m in self._loop_modes(mode): if if_exists == "replace": - self.delete(project_gcp=m["mode"]) + self.delete(mode=m["mode"]) elif if_exists == "update": - self.update(project_gcp=m["mode"]) + self.update(mode=m["mode"]) continue # Send the dataset to the API for creation, with an explicit timeout. # Raises google.api_core.exceptions.Conflict if the Dataset already # exists within the project. - try: - if not self.exists(project_gcp=m["mode"]): + if not self.exists(mode=m["mode"]): # Construct a full Dataset object to send to the API. dataset_obj = self._setup_dataset_object( - dataset_id=m["id"], - location=location, - project_gcp=m["mode"], + dataset_id=m["id"], location=location, mode=m["mode"] ) m["client"].create_dataset( dataset_obj ) # Make an API request. logger.success( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.dataset_id, - project_gcp=m["mode"], + mode=m["mode"], object="Dataset", action="created", ) # Make prod dataset public self.publicize( - dataset_is_public=dataset_is_public, - project_gcp=m["mode"], + dataset_is_public=dataset_is_public, mode=m["mode"] ) - except Conflict as e: if if_exists == "pass": continue @@ -242,55 +232,55 @@ def create( f"Dataset {self.dataset_id} already exists" ) from e - def delete(self, project_gcp: str = "staging") -> None: + def delete(self, mode: str = "all") -> None: """ Deletes dataset in BigQuery. Toggle mode to choose which dataset to delete. Args: - project_gcp: Which dataset to delete [`prod`|`staging`] + mode: Which dataset to delete [`prod`|`staging`|`all`] """ - for m in self._loop_modes(project_gcp): + for m in self._loop_modes(mode): m["client"].delete_dataset( m["id"], delete_contents=True, not_found_ok=True ) logger.info( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.dataset_id, - project_gcp=m["mode"], + mode=m["mode"], object="Dataset", action="deleted", ) def update( - self, project_gcp: str = "staging", location: Optional[str] = None + self, mode: str = "all", location: Optional[str] = None ) -> None: """ Update dataset description. Toggle mode to choose which dataset to update. Args: - project_gcp: Which dataset to update [`prod`|`staging`] + mode: Which dataset to update [`prod`|`staging`|`all`] location: Location of dataset data. List of possible region names: [BigQuery locations](https://cloud.google.com/bigquery/docs/locations) """ - for m in self._loop_modes(project_gcp): + for m in self._loop_modes(mode): # Send the dataset to the API to update, with an explicit timeout. # Raises google.api_core.exceptions.Conflict if the Dataset already # exists within the project. m["client"].update_dataset( self._setup_dataset_object( - m["id"], location=location, project_gcp=m["mode"] + m["id"], location=location, mode=m["mode"] ), fields=["description"], ) # Make an API request. logger.success( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.dataset_id, - project_gcp=m["mode"], + mode=m["mode"], object="Dataset", action="updated", ) diff --git a/python-package/basedosdados/upload/datatypes.py b/python-package/basedosdados/upload/datatypes.py index ff565171e..b1b468461 100644 --- a/python-package/basedosdados/upload/datatypes.py +++ b/python-package/basedosdados/upload/datatypes.py @@ -32,8 +32,7 @@ def __init__( csv_skip_leading_rows=1, csv_delimiter=",", csv_allow_jagged_rows=False, - project_gcp="staging", - folder="staging", + mode="staging", bucket_name=None, partitioned=False, biglake_connection_id=None, @@ -44,10 +43,9 @@ def __init__( self.csv_delimiter = csv_delimiter self.csv_skip_leading_rows = csv_skip_leading_rows self.csv_allow_jagged_rows = csv_allow_jagged_rows - self.folder = folder - self.project = project_gcp + self.mode = mode self.uri = ( - f"gs://{bucket_name}/{folder}/{self.dataset_id}/{table_id}/*" + f"gs://{bucket_name}/{self.mode}/{self.dataset_id}/{table_id}/*" ) self.partitioned = partitioned self.biglake_connection_id = biglake_connection_id @@ -112,8 +110,6 @@ def external_config(self): "Base dos Dados just supports csv and parquet files" ) _external_config.source_uris = self.uri - - # breakpoint() if self.partitioned: _external_config.hive_partitioning = self.partition() diff --git a/python-package/basedosdados/upload/storage.py b/python-package/basedosdados/upload/storage.py index 29cd7e37f..832a0e0d1 100644 --- a/python-package/basedosdados/upload/storage.py +++ b/python-package/basedosdados/upload/storage.py @@ -78,7 +78,7 @@ def _resolve_partitions(partitions: Union[str, dict[str, str]]) -> str: def _build_blob_name( self, filename: str, - folder: str, + mode: str, partitions: Optional[Union[str, dict[str, str]]] = None, ) -> str: """ @@ -86,7 +86,8 @@ def _build_blob_name( """ # table folder - blob_name = f"{folder}/{self.dataset_id}/{self.table_id}/" + blob_name = f"{mode}/{self.dataset_id}/{self.table_id}/" + # add partition folder if partitions is not None: blob_name += self._resolve_partitions(partitions) @@ -135,14 +136,14 @@ def init(self, replace: bool = False, very_sure: bool = False) -> None: def upload( self, path: Union[str, Path], - folder: str = "staging", + mode: str = "all", partitions: Optional[Union[str, dict[str, str]]] = None, if_exists: str = "raise", chunk_size: Optional[int] = None, **upload_args, ) -> None: """ - Upload to storage at `///`. + Upload to storage at `///`. You can: @@ -156,19 +157,20 @@ def upload( *Remember all files must follow a single schema.* Otherwise, things might fail in the future. - folder: + Modes: * `raw` : raw files from datasource * `staging` : pre-treated files ready to upload to BigQuery * `header`: header of the tables * `auxiliary_files`: auxiliary files from each table * `architecture`: architecture sheet of the tables - * `name organization`: If you are working on any external project + * `all`: if no treatment is needed, use `all`. + * `organization_name`: Name of organization Args: path: Where to find the file or folder to upload to storage. - folder: Folder of which dataset to update - [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`name organization`] + mode: Folder of which dataset to update + [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`all`|`organization_name`] partitions: If adding a single file, use this to add it to a specific partition. Can be a string or dict. if_exists: What to do if data exists. @@ -206,59 +208,67 @@ def upload( paths = [path] parts = [partitions or None] - self._check_folder(folder) + self._check_mode(mode) - for filepath, part in tqdm( - list(zip(paths, parts)), desc="Uploading files" - ): - blob_name = self._build_blob_name(filepath.name, folder, part) + _mode = ( + ["raw", "staging", "header", "auxiliary_files", "architecture"] + if mode == "all" + else [mode] + ) + for m in _mode: + for filepath, part in tqdm( + list(zip(paths, parts)), desc="Uploading files" + ): + blob_name = self._build_blob_name(filepath.name, m, part) - blob = self.bucket.blob(blob_name, chunk_size=chunk_size) - if not blob.exists() or if_exists == "replace": - upload_args["timeout"] = upload_args.get("timeout", None) + blob = self.bucket.blob(blob_name, chunk_size=chunk_size) - blob.upload_from_filename(str(filepath), **upload_args) + if not blob.exists() or if_exists == "replace": + upload_args["timeout"] = upload_args.get("timeout", None) - elif if_exists == "pass": - pass + blob.upload_from_filename(str(filepath), **upload_args) - else: - raise BaseDosDadosException( - f"Data already exists at {self.bucket_name}/{blob_name}. " - "If you are using Storage.upload then set if_exists to " - "'replace' to overwrite data \n" - "If you are using Table.create then set if_storage_data_exists " - "to 'replace' to overwrite data." - ) + elif if_exists == "pass": + pass - logger.success( - "{object} {filename} in folder {folder} was {action}!", - filename=filepath.name, - folder=folder, - object="File", - action="uploaded", - ) + else: + raise BaseDosDadosException( + f"Data already exists at {self.bucket_name}/{blob_name}. " + "If you are using Storage.upload then set if_exists to " + "'replace' to overwrite data \n" + "If you are using Table.create then set if_storage_data_exists " + "to 'replace' to overwrite data." + ) + + logger.success( + " {object} {filename}_{mode} was {action}!", + filename=filepath.name, + mode=m, + object="File", + action="uploaded", + ) def download( self, filename: str = "*", savepath: Union[Path, str] = Path("."), partitions: Optional[Union[str, dict[str, str]]] = None, - folder: str = "staging", + mode: str = "staging", if_not_exists: str = "raise", ) -> None: """ Download files from Google Storage from path - `folder/dataset_id/table_id/partitions/filename` and replicate folder + `mode/dataset_id/table_id/partitions/filename` and replicate folder hierarchy on save. - folders: + Modes: * `raw` : raw files from datasource * `staging` : pre-treated files ready to upload to BigQuery * `header`: header of the tables * `auxiliary_files`: auxiliary files from each table * `architecture`: architecture sheet of the tables + * `organization_name`: Name of organization You can use the `partitions` argument to choose files from a partition. @@ -269,19 +279,19 @@ def download( a directory. partitions: If downloading a single file, use this to specify the partition path from which to download. Can be a string `=/=` or dict `dict(key=value, key2=value2)`. - folder: Folder of which dataset to update. - [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`] + mode: Folder of which dataset to update. + [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`organization_name`] if_not_exists: What to do if data not found. * `raise`: Raises FileNotFoundError. * `pass`: Do nothing and exit the function. Raises: - FileNotFoundError: If the given path `////` could not be found or there are + FileNotFoundError: If the given path `////` could not be found or there are no files to download. """ # Prefix to locate files within the bucket - prefix = f"{folder}/{self.dataset_id}/{self.table_id}/" + prefix = f"{mode}/{self.dataset_id}/{self.table_id}/" # Add specific partition to search prefix if partitions: @@ -317,9 +327,9 @@ def download( blob.download_to_filename(filename=save_file_path) logger.success( - " {object} {object_id}_{folder} was {action} at: {path}!", + " {object} {object_id}_{mode} was {action} at: {path}!", object_id=self.dataset_id, - folder=folder, + mode=mode, object="File", action="downloaded", path={str(savepath)}, @@ -328,41 +338,50 @@ def download( def delete_file( self, filename: str, - folder: str, + mode: str, partitions: Optional[Union[str, dict[str, str]]] = None, not_found_ok: bool = False, ) -> None: """ - Delete file from path `/////`. + Delete file from path `/////`. Args: filename: Name of the file to be deleted. mode: Folder of which dataset to update - [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`all`] + [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`all`|`organization_name`] partitions: Hive structured partition as a string `=/=` or dict `dict(key=value, key2=value2)`. not_found_ok: What to do if file not found. """ - self._check_folder(folder) + self._check_mode(mode) - blob = self.bucket.blob( - self._build_blob_name(filename, folder, partitions) + mode_ = ( + ["raw", "staging", "header", "auxiliary_files", "architecture"] + if mode == "all" + else [mode] ) - if blob.exists() or not blob.exists() and not not_found_ok: - blob.delete() + for m in mode_: + blob = self.bucket.blob( + self._build_blob_name(filename, m, partitions) + ) + + if blob.exists() or not blob.exists() and not not_found_ok: + blob.delete() + else: + return logger.success( - " {object} {filename}_{folder} was {action}!", + " {object} {filename}_{mode} was {action}!", filename=filename, - folder=folder, + mode=mode_, object="File", action="deleted", ) def delete_table( self, - folder: str = "staging", + mode: str = "staging", bucket_name: Optional[str] = None, not_found_ok: bool = False, ) -> None: @@ -371,7 +390,7 @@ def delete_table( Args: mode: Folder of which dataset to update - [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`] + [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`organization_name`] bucket_name: The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. @@ -381,12 +400,12 @@ def delete_table( FileNotFoundError: If the requested table could not be found. """ - prefix = f"{folder}/{self.dataset_id}/{self.table_id}/" + prefix = f"{mode}/{self.dataset_id}/{self.table_id}/" if bucket_name is not None: table_blobs = list( self.client["storage_staging"] - .bucket(f"{bucket_name}", user_project=self.billing_project_id) + .bucket(bucket_name, user_project=self.billing_project_id) .list_blobs(prefix=prefix) ) @@ -423,9 +442,9 @@ def delete_table( counter += 1 traceback.print_exc(file=sys.stderr) logger.success( - " {object} {object_id}_{folder} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.table_id, - folder=folder, + mode=mode, object="Table", action="deleted", ) @@ -434,7 +453,7 @@ def copy_table( self, source_bucket_name: str = "basedosdados", destination_bucket_name: Optional[str] = None, - folder: str = "staging", + mode: str = "staging", new_table_id: Optional[str] = None, ) -> None: """ @@ -448,8 +467,8 @@ def copy_table( to. If None, defaults to the bucket initialized when instantiating the Storage object. You can check it with the `Storage().bucket` property. - folder: Folder of which dataset to update - [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`] + mode: Folder of which dataset to update + [`raw`|`staging`|`header`|`auxiliary_files`|`architecture`|`organization_name`] new_table_id: New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object. """ @@ -457,9 +476,9 @@ def copy_table( source_table_ref = list( self.client["storage_staging"] .bucket(source_bucket_name, user_project=self.billing_project_id) - .list_blobs(prefix=f"{folder}/{self.dataset_id}/{self.table_id}/") + .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/") ) - # breakpoint() + if not source_table_ref: raise FileNotFoundError( f"Could not find the requested table {self.dataset_id}.{self.table_id}" @@ -506,10 +525,10 @@ def copy_table( time.sleep(5) traceback.print_exc(file=sys.stderr) logger.success( - " {object} {object_id}_{folder} was {action} to {new_object_id}_{folder}!", + " {object} {object_id}_{mode} was {action} to {new_object_id}_{mode}!", object_id=self.table_id, new_object_id=new_table_id if new_table_id else self.table_id, - folder=folder, + mode=mode, object="Table", action="copied", ) diff --git a/python-package/basedosdados/upload/table.py b/python-package/basedosdados/upload/table.py index 908d8a29c..d7cc73d28 100644 --- a/python-package/basedosdados/upload/table.py +++ b/python-package/basedosdados/upload/table.py @@ -64,13 +64,13 @@ def table_config(self) -> dict[str, Any]: # return self._load_yaml(self.table_folder / "table_config.yaml") return self.backend.get_table_config(self.dataset_id, self.table_id) - def _get_table_obj(self, project_gcp: str = "staging"): + def _get_table_obj(self, mode: str): """ Get table object from BigQuery. """ - return self.client[f"bigquery_{project_gcp}"].get_table( - self.table_full_name[project_gcp] + return self.client[f"bigquery_{mode}"].get_table( + self.table_full_name[mode] ) def _is_partitioned( @@ -78,13 +78,13 @@ def _is_partitioned( data_sample_path: Optional[Union[str, Path]] = None, source_format: str = "csv", csv_delimiter: str = ",", - project_gcp: str = "staging", ) -> bool: if data_sample_path is not None: table_columns = self._get_columns_from_data( data_sample_path=data_sample_path, source_format=source_format, csv_delimiter=csv_delimiter, + mode="staging", ) else: table_columns = self._get_columns_metadata_from_api() @@ -119,7 +119,6 @@ def _load_schema_from_json( def _load_staging_schema_from_data( self, data_sample_path: Optional[Union[str, Path]] = None, - project_gcp: str = "staging", source_format: str = "csv", csv_delimiter: str = ",", ) -> list[SchemaField]: @@ -127,7 +126,7 @@ def _load_staging_schema_from_data( Generate schema from columns metadata in data sample. """ - if self.table_exists(project_gcp=project_gcp): + if self.table_exists(mode="staging"): logger.warning( " {object} {object_id} allready exists, replacing schema!", object_id=self.table_id, @@ -138,29 +137,28 @@ def _load_staging_schema_from_data( data_sample_path=data_sample_path, source_format=source_format, csv_delimiter=csv_delimiter, + mode="staging", ) return self._load_schema_from_json( columns=table_columns.get("columns") ) - def _load_schema_from_bq( - self, project_gcp: str = "staging" - ) -> list[SchemaField]: + def _load_schema_from_bq(self, mode: str = "staging") -> list[SchemaField]: """Load schema from table config Args: - project_gcp: Which dataset to create [`prod`|`staging`]. + mode: Which dataset to create [`prod`|`staging`]. """ - table_columns = self._get_columns_from_bq(project_gcp=project_gcp) + table_columns = self._get_columns_from_bq() columns = table_columns.get("partition_columns") + table_columns.get( "columns" ) return self._load_schema_from_json(columns=columns) def _load_schema_from_api( - self, project_gcp: str = "staging" + self, mode: str = "staging" ) -> list[SchemaField]: """Load schema from table config @@ -168,7 +166,7 @@ def _load_schema_from_api( mode: Which dataset to create [`prod`|`staging`]. """ - if self.table_exists(project_gcp=project_gcp): + if self.table_exists(mode=mode): logger.warning( " {object} {object_id} allready exists, replacing schema!", object_id=self.table_id, @@ -187,6 +185,7 @@ def _get_columns_from_data( data_sample_path: Optional[Union[str, Path]] = None, source_format: str = "csv", csv_delimiter: str = ",", + mode: str = "staging", ) -> dict[str, list[dict[str, str]]]: """ Get the partition columns from the structure of `data_sample_path`. @@ -271,7 +270,7 @@ def _parser_blobs_to_partition_dict(self) -> Optional[dict[Any, Any]]: Extract the partition information from the blobs. """ - if not self.table_exists(project_gcp="staging"): + if not self.table_exists(mode="staging"): return blobs = ( self.client["storage_staging"] @@ -294,16 +293,16 @@ def _parser_blobs_to_partition_dict(self) -> Optional[dict[Any, Any]]: return partitions_dict def _get_columns_from_bq( - self, project_gcp: str = "staging" + self, mode: str = "staging" ) -> dict[str, list[dict[str, str]]]: """ Get columns and partition columns from BigQuery. """ - if not self.table_exists(project_gcp=project_gcp): - msg = f"Table {self.dataset_id}.{self.table_id} does not exist in {project_gcp}, please create first!" + if not self.table_exists(mode=mode): + msg = f"Table {self.dataset_id}.{self.table_id} does not exist in {mode}, please create first!" raise BaseDosDadosException(msg) else: - schema = self._get_table_obj(project_gcp=project_gcp).schema + schema = self._get_table_obj(mode=mode).schema partition_dict = self._parser_blobs_to_partition_dict() @@ -333,11 +332,11 @@ def _get_columns_from_bq( ], } # pyright: ignore[reportReturnType] - def _get_cross_columns_from_bq_api(self, project_gcp: str = "staging"): + def _get_cross_columns_from_bq_api(self): """ Get cross columns from BigQuery API. """ - bq = self._get_columns_from_bq(project_gcp=project_gcp) + bq = self._get_columns_from_bq(mode="staging") bq_columns = bq.get("partition_columns") + bq.get("columns") api = self._get_columns_metadata_from_api() @@ -412,16 +411,16 @@ def _make_publish_sql(self) -> str: return publish_txt - def table_exists(self, project_gcp: str = "staging") -> bool: + def table_exists(self, mode: str) -> bool: """ Check if table exists in BigQuery. Args: - project_gcp: Which dataset to check [`prod`|`staging`]. + mode: Which dataset to check [`prod`|`staging`]. """ try: - ref = self._get_table_obj(project_gcp=project_gcp) + ref = self._get_table_obj(mode=mode) except google.api_core.exceptions.NotFound: ref = None @@ -436,7 +435,7 @@ def _get_biglake_connection( Get or create BigLake connection and set permissions if needed. """ connection = Connection( - name="biglake", location=location, project_gcp="staging" + name="biglake", location=location, mode="staging" ) if not connection.exists: try: @@ -479,7 +478,7 @@ def _get_biglake_connection( return connection - def _get_table_description(self, project: str = "staging") -> str: + def _get_table_description(self, mode: str = "staging") -> str: """ Get table description to BigQuery table. @@ -487,7 +486,7 @@ def _get_table_description(self, project: str = "staging") -> str: mode: Which dataset to check [`prod`|`staging`]. """ table_path = self.table_full_name["prod"] - if project == "staging": + if mode == "staging": description = f"staging table for `{table_path}`" else: try: @@ -514,8 +513,6 @@ def create( location: Optional[str] = None, chunk_size: Optional[int] = None, biglake_table: bool = False, - folder: str = "staging", - project_gcp: str = "staging", set_biglake_connection_permissions: bool = True, ) -> None: """ @@ -528,8 +525,9 @@ def create( The new table is located at `_staging.` in BigQuery. - Data can be found in Storage at `////*` and is used to build the table. - If + Data can be found in Storage at + `/staging///*` and is used to build + the table. The following data types are supported: @@ -562,8 +560,7 @@ def create( exists on your bucket: * `raise`: Raises a Conflict exception * `replace`: Replaces the table - * `pass`: Does nothing - + * `pass`: Do nothing if_dataset_exists: Determines what to do if the dataset already exists: * `raise`: Raises a Conflict exception @@ -585,11 +582,6 @@ def create( other BigQuery native table. See [BigLake intro](https://cloud.google.com/bigquery/docs/biglake-intro) for more on BigLake. - Project: Sets Project in gcloud. Default = `staging` - Staging = basedosdados-dev - Prod = basedosdados - Folder: Bucket folder name. Default `////*` - If you want to upload data to the ´basedosdados-consultoria´ bucket, we advise you to follow the following standardization: `////*` set_biglake_connection_permissions: If set to `True`, attempts to grant the BigLake connection service account access to the table's data in GCS. @@ -597,11 +589,14 @@ def create( if path is None: # Look if table data already exists at Storage - - data = self.client["storage_staging"].list_blobs( - self.bucket_name, - prefix=f"{folder}/{self.dataset_id}/{self.table_id}", + data = ( + self.client["storage_staging"] + .bucket(self.bucket_name, user_project=self.billing_project_id) + .list_blobs( + prefix=f"staging/{self.dataset_id}/{self.table_id}", + ) ) + # Raise: Cannot create table without external data if not data: raise BaseDosDadosException( @@ -623,7 +618,7 @@ def create( bucket_name=self.bucket_name, ).upload( path=path, - folder=folder, + mode="staging", if_exists=if_storage_data_exists, chunk_size=chunk_size, ) @@ -636,7 +631,7 @@ def create( dataset_obj.create( if_exists=if_dataset_exists, - project_gcp=project_gcp, + mode="all", location=location, dataset_is_public=dataset_is_public, ) @@ -648,9 +643,9 @@ def create( ) biglake_connection_id = biglake_connection.connection_id - table = bigquery.Table(self.table_full_name[project_gcp]) + table = bigquery.Table(self.table_full_name["staging"]) - table.description = self._get_table_description(project=[project_gcp]) + table.description = self._get_table_description(mode="staging") table.external_data_configuration = Datatype( dataset_id=self.dataset_id, @@ -664,9 +659,8 @@ def create( csv_skip_leading_rows=csv_skip_leading_rows, csv_delimiter=csv_delimiter, csv_allow_jagged_rows=csv_allow_jagged_rows, + mode="staging", bucket_name=self.bucket_name, - project_gcp=project_gcp, - folder=folder, partitioned=self._is_partitioned( data_sample_path=path, source_format=source_format, @@ -690,7 +684,7 @@ def create( table_ref = None with contextlib.suppress(google.api_core.exceptions.NotFound): table_ref = self.client["bigquery_staging"].get_table( - self.table_full_name[project_gcp] + self.table_full_name["staging"] ) if isinstance(table_ref, google.cloud.bigquery.table.Table): @@ -702,10 +696,8 @@ def create( "Table already exists, choose replace if you want to overwrite it" ) - if if_table_exists == "replace" and self.table_exists( - project_gcp=project_gcp - ): - self.delete(project_gcp=project_gcp) + if if_table_exists == "replace" and self.table_exists(mode="staging"): + self.delete(mode="staging") try: self.client["bigquery_staging"].create_table(table) @@ -727,9 +719,9 @@ def create( ) from exc logger.success( - "{object} {object_id} was {action} in {project_gcp}!", + "{object} {object_id} was {action} in {mode}!", object_id=self.table_id, - project_gcp="staging", + mode="staging", object="Table", action="created", ) @@ -737,27 +729,29 @@ def create( def update( self, - project_gcp: str = "prod", + mode: str = "prod", custom_schema: Optional[list[dict[str, str]]] = None, ) -> None: """ Updates BigQuery schema and description. Args: - project_gcp: Table of which table to update [`prod`]. + mode: Table of which table to update [`prod`]. not_found_ok: What to do if table is not found. """ - table = self._get_table_obj(project_gcp="prod") + self._check_mode(mode) + + table = self._get_table_obj(mode) table.description = self._get_table_description() # when mode is staging the table schema already exists - if project_gcp == "prod" and custom_schema is None: + if mode == "prod" and custom_schema is None: table.schema = self._load_schema_from_json( columns=self._get_cross_columns_from_bq_api() ) - if project_gcp == "prod" and custom_schema is not None: + if mode == "prod" and custom_schema is not None: table.schema = self._load_schema_from_json(custom_schema) fields = ["description", "schema"] @@ -765,9 +759,9 @@ def update( self.client["bigquery_prod"].update_table(table, fields=fields) logger.success( - " {object} {object_id} was {action} in {project_gcp}!", + " {object} {object_id} was {action} in {mode}!", object_id=self.table_id, - project_gcp=project_gcp, + mode=mode, object="Table", action="updated", ) @@ -798,15 +792,15 @@ def publish( """ # TODO: review this method. Check if all required fields are filled - if if_exists == "replace" and self.table_exists(project_gcp="prod"): - self.delete(project_gcp="prod") + if if_exists == "replace" and self.table_exists(mode="prod"): + self.delete(mode="prod") publish_sql = self._make_publish_sql() # create view using API metadata if custom_publish_sql is None: self.client["bigquery_prod"].query(publish_sql).result() - self.update(project_gcp="prod") + self.update(mode="prod") # create view using custon query if custom_publish_sql is not None: @@ -822,7 +816,7 @@ def publish( action="published", ) - def delete(self, project_gcp: str = "staging") -> None: + def delete(self, mode: str = "all") -> None: """ Deletes table in BigQuery. @@ -830,26 +824,26 @@ def delete(self, project_gcp: str = "staging") -> None: mode: Table of which table to delete [`prod`|`staging`]. """ - if project_gcp == "prod": - for m, n in self.table_full_name[project_gcp].items(): - self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True) + self._check_mode(mode) + if mode == "all": + for m, n in self.table_full_name[mode].items(): + self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True) logger.info( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.table_id, - project_gcp=m, + mode=m, object="Table", action="deleted", ) else: - self.client[f"bigquery_{project_gcp}"].delete_table( - self.table_full_name[project_gcp], not_found_ok=True + self.client[f"bigquery_{mode}"].delete_table( + self.table_full_name[mode], not_found_ok=True ) - logger.info( - " {object} {object_id}_{project_gcp} was {action}!", + " {object} {object_id}_{mode} was {action}!", object_id=self.table_id, - project_gcp=project_gcp, + mode=mode, object="Table", action="deleted", ) @@ -893,7 +887,7 @@ def append( self.table_id, ).upload( filepath, - folder="staging", + mode="staging", partitions=partitions, if_exists=if_exists, chunk_size=chunk_size, diff --git a/python-package/tests/test_dataset.py b/python-package/tests/test_dataset.py index 7b9204e3e..060016169 100644 --- a/python-package/tests/test_dataset.py +++ b/python-package/tests/test_dataset.py @@ -58,9 +58,9 @@ def test_loop_modes(): """ dataset = Dataset(dataset_id=dataset_id) - assert len(list(dataset._loop_modes(project_gcp="staging"))) == 1 - assert "staging" in next(dataset._loop_modes(project_gcp="staging"))["id"] - assert len(list(dataset._loop_modes(project_gcp="prod"))) == 1 + assert len(list(dataset._loop_modes(mode="staging"))) == 1 + assert "staging" in next(dataset._loop_modes(mode="staging"))["id"] + assert len(list(dataset._loop_modes(mode="prod"))) == 1 @pytest.mark.order6 diff --git a/python-package/tests/test_storage.py b/python-package/tests/test_storage.py index 9c9f471eb..8affea551 100644 --- a/python-package/tests/test_storage.py +++ b/python-package/tests/test_storage.py @@ -20,28 +20,28 @@ def test_upload_with_errors(): """ Test the upload method raise errors """ - storage.delete_file(csv_path, folder="staging", not_found_ok=True) - storage.upload(csv_path, folder="staging", if_exists="pass") + storage.delete_file(csv_path, mode="staging", not_found_ok=True) + storage.upload(csv_path, mode="staging", if_exists="pass") with pytest.raises(Exception): - storage.upload(csv_path, folder="staging") - storage.upload(csv_path, folder="staging", if_exists="replace") + storage.upload(csv_path, mode="staging") + storage.upload(csv_path, mode="staging", if_exists="replace") storage.upload( csv_path, - folder="staging", + mode="staging", if_exists="replace", partitions="key1=value1/key2=value2", ) storage.upload( csv_path, - folder="staging", + mode="staging", if_exists="replace", partitions={"key1": "value1", "key2": "value1"}, ) with pytest.raises(Exception): storage.upload( csv_path, - folder="staging", + mode="staging", if_exists="replace", partitions=["key1", "value1", "key2", "value1"], ) @@ -63,7 +63,7 @@ def test_download_filename(): """ storage.download( - filename="municipio.csv", savepath=SAVEPATH, folder="staging" + filename="municipio.csv", savepath=SAVEPATH, mode="staging" ) assert ( Path(SAVEPATH) / "staging" / dataset_id / table_id / "municipio.csv" @@ -78,7 +78,7 @@ def test_download_partitions(): storage.download( savepath=SAVEPATH, - folder="staging", + mode="staging", partitions="key1=value1/key2=value1/", ) @@ -94,7 +94,7 @@ def test_download_partitions(): storage.download( savepath=SAVEPATH, - folder="staging", + mode="staging", partitions={"key1": "value1", "key2": "value2"}, ) @@ -112,10 +112,10 @@ def test_download_partitions(): @pytest.mark.order2 def test_download_default(): """ - Test the download method with the default folder + Test the download method with the default mode """ - storage.download(savepath=SAVEPATH, folder="staging") + storage.download(savepath=SAVEPATH, mode="staging") assert ( Path(SAVEPATH) / "staging" / dataset_id / table_id / "municipio.csv" @@ -134,7 +134,7 @@ def test_copy_table(): source_bucket_name="basedosdados-dev", destination_bucket_name="basedosdados-dev", new_table_id=new_table_id, - folder="staging", + mode="staging", ) savepath = SAVEPATH / "storage_test_copy_table" @@ -180,8 +180,3 @@ def test_delete_table(): with pytest.raises(FileNotFoundError): storage.delete_table() - - -# test_upload_with_errors() - -# test_copy_table() diff --git a/python-package/tests/test_table.py b/python-package/tests/test_table.py index 6445864fe..e50594bc5 100644 --- a/python-package/tests/test_table.py +++ b/python-package/tests/test_table.py @@ -207,10 +207,11 @@ def test_get_columns_from_data(): """ Test if get the columns from data """ - out = table._get_columns_from_data(csv_path) + out = table._get_columns_from_data(csv_path, mode="staging") out2 = table._get_columns_from_data( data_sample_path="tests/sample_data/table", source_format="csv", + mode="staging", ) assert isinstance(out, dict) @@ -241,9 +242,7 @@ def test_create_no_path_error(): Teste if error is raised when no path is provided """ - Dataset(dataset_id=dataset_id).create( - project_gcp="staging", if_exists="pass" - ) + Dataset(dataset_id=dataset_id).create(mode="staging", if_exists="pass") with pytest.raises(FileNotFoundError): table.create("dev-api", if_storage_data_exists="raise") @@ -308,7 +307,7 @@ def test_delete_all(): """ Test delete method with all modes """ - table.delete("staging") + table.delete("all") assert not table.table_exists("staging") assert not table.table_exists("prod")