From 36df2e48426adededa1343bcd5aa0f72e5bbb1b6 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 25 Nov 2025 22:07:13 +0100 Subject: [PATCH 1/8] support core+ index and profile_qc search entry --- argopy/stores/index/extensions.py | 31 ++++++++++ .../index/implementations/pyarrow/index.py | 60 +++++++++---------- .../implementations/pyarrow/search_engine.py | 45 ++++++++++++++ argopy/stores/index/spec.py | 39 +++++++++++- argopy/utils/checkers.py | 11 ++++ docs/advanced-tools/stores/argoindex.rst | 30 +++++++--- 6 files changed, 177 insertions(+), 39 deletions(-) diff --git a/argopy/stores/index/extensions.py b/argopy/stores/index/extensions.py index 4480a0373..314da2537 100644 --- a/argopy/stores/index/extensions.py +++ b/argopy/stores/index/extensions.py @@ -480,6 +480,37 @@ def composer(profiler_type): self._obj.search_type.update(namer(profiler_label)) return search_filter + @abstractmethod + def profile_qc(self, param): + """Search index for parameter profile QCs with a specific value + + Parameters + ---------- + PARAMs: dict + A dictionary with parameters as keys, and profile QC as a string or a list of strings + logical: str, default='and' + Indicate to search for all (``and``) or any (``or``) of the parameters profile QC. This operator applies + between each parameter. + + Returns + ------- + :class:`ArgoIndex` + + Examples + -------- + .. code-block:: python + + from argopy import ArgoIndex + idx = ArgoIndex(index_file='core+') + + idx.query.profile_qc({'TEMP': 'A'}) + idx.query.profile_qc({'PSAL': 'A'}) + idx.query.profile_qc({'DOXY': ['A', 'B']}) + idx.query.profile_qc({'PSAL': 'A', 'DOXY': 'A'}, logical='or') + + """ + raise NotImplementedError("Not implemented") + def compose(self, query: dict, nrows=None): """Compose query with multiple search methods diff --git a/argopy/stores/index/implementations/pyarrow/index.py b/argopy/stores/index/implementations/pyarrow/index.py index 2a4b4d74e..d43d56876 100644 --- a/argopy/stores/index/implementations/pyarrow/index.py +++ b/argopy/stores/index/implementations/pyarrow/index.py @@ -461,11 +461,11 @@ def to_indexfile(self, file): str """ - def convert_a_date(row): - try: - return row.strftime("%Y%m%d%H%M%S") - except Exception: - return "" + # def convert_a_date(row): + # try: + # return row.strftime("%Y%m%d%H%M%S") + # except Exception: + # return "" s = self.search @@ -473,31 +473,31 @@ def convert_a_date(row): if "longitude_360" in s.column_names: s = s.drop_columns("longitude_360") - if self.convention not in [ - "ar_index_global_meta", - ]: - new_date = pa.array(self.search["date"].to_pandas().apply(convert_a_date)) - - new_date_update = pa.array( - self.search["date_update"].to_pandas().apply(convert_a_date) - ) - - if self.convention not in [ - "ar_index_global_meta", - ]: - s = s.set_column(1, "date", new_date) - - if self.convention == "ar_index_global_prof": - s = s.set_column(7, "date_update", new_date_update) - elif self.convention in [ - "argo_bio-profile_index", - "argo_synthetic-profile_index", - ]: - s = s.set_column(9, "date_update", new_date_update) - elif self.convention in ["argo_aux-profile_index"]: - s = s.set_column(8, "date_update", new_date_update) - elif self.convention in ["ar_index_global_meta"]: - s = s.set_column(3, "date_update", new_date_update) + # if self.convention not in [ + # "ar_index_global_meta", + # ]: + # new_date = pa.array(self.search["date"].to_pandas().apply(convert_a_date)) + # + # new_date_update = pa.array( + # self.search["date_update"].to_pandas().apply(convert_a_date) + # ) + # + # if self.convention not in [ + # "ar_index_global_meta", + # ]: + # s = s.set_column(1, "date", new_date) + # + # if self.convention in ["ar_index_global_prof", "argo_profile_detailled_index"]: + # s = s.set_column(7, "date_update", new_date_update) + # elif self.convention in [ + # "argo_bio-profile_index", + # "argo_synthetic-profile_index", + # ]: + # s = s.set_column(9, "date_update", new_date_update) + # elif self.convention in ["argo_aux-profile_index"]: + # s = s.set_column(8, "date_update", new_date_update) + # elif self.convention in ["ar_index_global_meta"]: + # s = s.set_column(3, "date_update", new_date_update) write_options = csv.WriteOptions( delimiter=",", include_header=False, quoting_style="none" diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 2079a8d0b..e979df1a9 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -472,3 +472,48 @@ def composer(profiler_type): else: self._obj.search_type.update(namer(profiler_type)) return search_filter + + def profile_qc(self, PARAMs: dict, logical="and", nrows=None, composed=False): + def checker(PARAMs): + if "profile_temp_qc" not in self._obj.convention_columns: + raise InvalidDatasetStructure("Cannot search for profile QC in this index)") + # Validate PARAMs + [ + PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs + ] + if not np.all( + [v in ['', ' ', '1', 'A', 'B', 'C', 'D', 'E', 'F'] for vals in PARAMs.values() for v in vals] + ): + raise ValueError("Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'") + log.debug("Argo index searching for profile QC: %s ..." % PARAMs) + return PARAMs + + def namer(PARAMs, logical): + return {"PROFQC": (PARAMs, logical)} + + def composer(PARAMs, logical): + filt = [] + # def filt_profile_qc(this_param, this_qc): + # return pa.compute.is_in( + # self._obj.index[f"profile_{this_param.lower()}_qc"], pa.array(this_qc) + # ) + + for param in PARAMs: + qcflag = PARAMs[param] + filt.append(pa.compute.is_in( + self._obj.index[f"profile_{param.lower()}_qc"], pa.array(qcflag) + )) + + return self._obj._reduce_a_filter_list(filt, op=logical) + + PARAMs = checker(PARAMs) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(PARAMs, logical) + if not composed: + self._obj.search_type = namer(PARAMs, logical) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(PARAMs, logical)) + return search_filter \ No newline at end of file diff --git a/argopy/stores/index/spec.py b/argopy/stores/index/spec.py index 6e5b966e7..78c9b8e7c 100644 --- a/argopy/stores/index/spec.py +++ b/argopy/stores/index/spec.py @@ -61,6 +61,8 @@ class ArgoIndexStoreProto(ABC): "aux", "ar_index_global_meta", "meta", + "argo_profile_detailled_index", + "core+", ] """List of supported conventions""" @@ -102,6 +104,7 @@ def __init__( - ``bgc-s`` or ``argo_synthetic-profile_index.txt`` - ``aux`` or ``etc/argo-index/argo_aux-profile_index.txt`` - ``meta`` or ``ar_index_global_meta.txt`` + - ``core+`` or ``argo_profile_detailled_index.txt`` - a local absolute path toward a file following an Argo index convention. When using a local file, you need to set the ``convention`` followed by the file. convention: str, default: None @@ -117,6 +120,7 @@ def __init__( - ``bgc-s`` or ``argo_synthetic-profile_index`` - ``aux`` or ``argo_aux-profile_index`` - ``meta`` or ``ar_index_global_meta`` + - ``core+`` or ``argo_profile_detailled_index`` cache : bool, default: False Use cache or not. @@ -141,6 +145,8 @@ def __init__( index_file = "etc/argo-index/argo_aux-profile_index.txt" elif index_file in ["meta"]: index_file = "ar_index_global_meta.txt" + elif index_file in ["core+"]: + index_file = "etc/argo-index/argo_profile_detailled_index.txt" self.index_file = index_file # Default number of commented lines to skip at the beginning of csv index files @@ -237,6 +243,8 @@ def __init__( convention = "argo_aux-profile_index" elif convention in ["meta"]: convention = "ar_index_global_meta" + elif convention in ["core+"]: + convention = "argo_profile_detailled_index" self._convention = convention # Check if the index file exists @@ -322,7 +330,6 @@ def cname(self) -> str: Return 'full' if a search was not yet performed on the :class:`ArgoIndex` instance - This method uses the BOX, WMO, CYC keys of the index instance ``search_type`` property """ cname = "full" C = [] @@ -399,6 +406,12 @@ def cname(self) -> str: LOG = 'or' cname = ("_%s_" % LOG).join(PLABEL) + elif "PROFQC" == key: + PROFQC, LOG = self.search_type["PROFQC"] + cname = ("_%s_" % LOG).join( + ["%s_%s" % (p, "".join(PROFQC[p])) for p in PROFQC] + ) + C.append(cname) return "_and_".join(C) @@ -513,6 +526,8 @@ def convention_title(self): title = "Aux-Profile directory file of the Argo GDAC" elif self.convention in ["ar_index_global_meta", "meta"]: title = "Metadata directory file of the Argo GDAC" + elif self.convention in ["argo_profile_detailled_index", "core+"]: + title = "Detailed Profile directory file of the Argo GDAC" return title @property @@ -529,6 +544,12 @@ def convention_columns(self) -> List[str]: 'parameters', 'date_update'] elif self.convention in ["ar_index_global_meta"]: columns = ['file', 'profiler_type', 'institution', 'date_update'] + elif self.convention in ["argo_profile_detailled_index"]: + columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'date_update', + 'profile_temp_qc', 'profile_psal_qc','profile_doxy_qc', + 'ad_psal_adjustment_mean','ad_psal_adjustment_deviation', + 'gdac_date_creation','gdac_date_update','n_levels', + ] return columns @@ -989,6 +1010,22 @@ def _insert_header(self, originalfile): # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,profiler_type,institution,date_update +""" % pd.to_datetime( + "now", utc=True + ).strftime( + "%Y%m%d%H%M%S" + ) + + elif self.convention == "argo_profile_detailled_index": + header = """# Title : Profile directory file of the Argo Global Data Assembly Center +# Description : The directory file describes all individual profile files of the argo GDAC ftp site +# Project : ARGO +# Format version : 2.2 +# Date of update : %s +# FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac +# FTP root number 2 : ftp://usgodae.usgodae.org/pub/outgoing/argo/dac +# GDAC node : CORIOLIS +file,date,latitude,longitude,ocean,profiler_type,institution,date_update,profile_temp_qc,profile_psal_qc,profile_doxy_qc,ad_psal_adjustment_mean,ad_psal_adjustment_deviation,gdac_date_creation,gdac_date_update,n_levels """ % pd.to_datetime( "now", utc=True ).strftime( diff --git a/argopy/utils/checkers.py b/argopy/utils/checkers.py index 908e66791..517c77225 100644 --- a/argopy/utils/checkers.py +++ b/argopy/utils/checkers.py @@ -470,6 +470,10 @@ def check_index_cols(column_names: list, convention: str = "ar_index_global_prof Metadata directory file of the Argo Global Data Assembly Center file,profiler_type,institution,date_update + argo_profile_detailled_index.txt: Detailed index of profile files + The directory file describes all individual profile files of the argo GDAC ftp site + file,date,latitude,longitude,ocean,profiler_type,institution,date_update,profile_temp_qc,profile_psal_qc,profile_doxy_qc,ad_psal_adjustment_mean,ad_psal_adjustment_deviation,gdac_date_creation,gdac_date_update,n_levels + """ # Default for 'ar_index_global_prof' ref = [ @@ -522,6 +526,13 @@ def check_index_cols(column_names: list, convention: str = "ar_index_global_prof "date_update", ] + if convention == "argo_profile_detailled_index": + ref = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'date_update', + 'profile_temp_qc', 'profile_psal_qc','profile_doxy_qc', + 'ad_psal_adjustment_mean','ad_psal_adjustment_deviation', + 'gdac_date_creation','gdac_date_update','n_levels', + ] + if not is_list_equal(column_names, ref): log.debug( "Expected (convention=%s): %s, got: %s" diff --git a/docs/advanced-tools/stores/argoindex.rst b/docs/advanced-tools/stores/argoindex.rst index ee3275079..dba4b3e29 100644 --- a/docs/advanced-tools/stores/argoindex.rst +++ b/docs/advanced-tools/stores/argoindex.rst @@ -25,33 +25,47 @@ The table below summarize the **argopy** support status of all Argo index files: * - - Index file + - File pattern - Supported - * - Profile + * - Individual Profile - ar_index_global_prof.txt + - //profiles/*_.nc - ✅ - * - Synthetic-Profile + * - Detailed Individual Profile + - //profiles/*_.nc + - argo_profile_detailled_index.txt + - ❌ + * - Individual Synthetic-Profile + - //profiles/S*_.nc - argo_synthetic-profile_index.txt - ✅ - * - Bio-Profile + * - Individual Bio-Profile + - //profiles/B*_.nc - argo_bio-profile_index.txt - ✅ + * - Auxiliary Profile + - //profiles/B*__aux.nc + - etc/argo-index/argo_aux-profile_index.txt + - ✅ * - Metadata + - //_meta.nc - ar_index_global_meta.txt - ✅ - * - Auxiliary - - etc/argo-index/argo_aux-profile_index.txt - - ✅ * - Trajectory + - //_*traj.nc - ar_index_global_traj.txt - ❌ * - Bio-Trajectory + - //_B*traj.nc - argo_bio-traj_index.txt - ❌ * - Technical + - //_tech.nc - ar_index_global_tech.txt - ❌ - * - Greylist - - ar_greylist.txt + * - Detailed Synthetic-Profile + - //_Sprof.nc + - argo_synthetic-profile_detailled_index.txt - ❌ Index files support can be added on demand. `Click here to raise an issue if you'd like to access other index files `_. From 3418e377c59b3b0ec84b77f95e2d6af3b0b1eb33 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 25 Nov 2025 22:34:08 +0100 Subject: [PATCH 2/8] Update search_engine.py --- .../index/implementations/pyarrow/search_engine.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index e979df1a9..351223fd4 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -493,15 +493,11 @@ def namer(PARAMs, logical): def composer(PARAMs, logical): filt = [] - # def filt_profile_qc(this_param, this_qc): - # return pa.compute.is_in( - # self._obj.index[f"profile_{this_param.lower()}_qc"], pa.array(this_qc) - # ) for param in PARAMs: - qcflag = PARAMs[param] + qcflags = PARAMs[param] filt.append(pa.compute.is_in( - self._obj.index[f"profile_{param.lower()}_qc"], pa.array(qcflag) + self._obj.index[f"profile_{param.lower()}_qc"], pa.array(qcflags) )) return self._obj._reduce_a_filter_list(filt, op=logical) @@ -516,4 +512,4 @@ def composer(PARAMs, logical): return self._obj else: self._obj.search_type.update(namer(PARAMs, logical)) - return search_filter \ No newline at end of file + return search_filter From 14c69a07243a21850e465c94db2aa10bc79b59bb Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 25 Nov 2025 22:34:14 +0100 Subject: [PATCH 3/8] Update search_engine.py --- .../implementations/pandas/search_engine.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 00e7d8ce2..315f0ac18 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -477,3 +477,42 @@ def composer(profiler_type): else: self._obj.search_type.update(namer(profiler_type)) return search_filter + + def profile_qc(self, PARAMs: dict, logical="and", nrows=None, composed=False): + def checker(PARAMs): + if "profile_temp_qc" not in self._obj.convention_columns: + raise InvalidDatasetStructure("Cannot search for profile QC in this index)") + # Validate PARAMs + [ + PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs + ] + if not np.all( + [v in ['', ' ', '1', 'A', 'B', 'C', 'D', 'E', 'F'] for vals in PARAMs.values() for v in vals] + ): + raise ValueError("Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'") + log.debug("Argo index searching for profile QC: %s ..." % PARAMs) + return PARAMs + + def namer(PARAMs, logical): + return {"PROFQC": (PARAMs, logical)} + + def composer(PARAMs, logical): + filt = [] + + for param in PARAMs: + qcflags = PARAMs[param] + filt.append(self._obj.index[f"profile_{param.lower()}_qc"].isin(qcflags)) + + return self._obj._reduce_a_filter_list(filt, op=logical) + + PARAMs = checker(PARAMs) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(PARAMs, logical) + if not composed: + self._obj.search_type = namer(PARAMs, logical) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(PARAMs, logical)) + return search_filter From ff31faede2ea4b4d0db2cf9d16102128956a8ae2 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 19 May 2026 12:15:13 +0200 Subject: [PATCH 4/8] Update search_engine.py --- .../implementations/pandas/search_engine.py | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 50f41cda6..3b2a046c0 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -492,45 +492,6 @@ def composer(profiler_type): self._obj.search_type.update(namer(profiler_type)) return search_filter - def profile_qc(self, PARAMs: dict, logical="and", nrows=None, composed=False): - def checker(PARAMs): - if "profile_temp_qc" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for profile QC in this index)") - # Validate PARAMs - [ - PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs - ] - if not np.all( - [v in ['', ' ', '1', 'A', 'B', 'C', 'D', 'E', 'F'] for vals in PARAMs.values() for v in vals] - ): - raise ValueError("Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'") - log.debug("Argo index searching for profile QC: %s ..." % PARAMs) - return PARAMs - - def namer(PARAMs, logical): - return {"PROFQC": (PARAMs, logical)} - - def composer(PARAMs, logical): - filt = [] - - for param in PARAMs: - qcflags = PARAMs[param] - filt.append(self._obj.index[f"profile_{param.lower()}_qc"].isin(qcflags)) - - return self._obj._reduce_a_filter_list(filt, op=logical) - - PARAMs = checker(PARAMs) - self._obj.load(nrows=self._obj._nrows_index) - search_filter = composer(PARAMs, logical) - if not composed: - self._obj.search_type = namer(PARAMs, logical) - self._obj.search_filter = search_filter - self._obj.run(nrows=nrows) - return self._obj - else: - self._obj.search_type.update(namer(PARAMs, logical)) - return search_filter - @search_s3 def institution_code(self, institution_code: List[str], nrows=None, composed=False): def checker(institution_code): @@ -603,3 +564,42 @@ def composer(DACs): else: self._obj.search_type.update(namer(dac)) return search_filter + + def profile_qc(self, PARAMs: dict, logical="and", nrows=None, composed=False): + def checker(PARAMs): + if "profile_temp_qc" not in self._obj.convention_columns: + raise InvalidDatasetStructure("Cannot search for profile QC in this index)") + # Validate PARAMs + [ + PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs + ] + if not np.all( + [v in ['', ' ', '1', 'A', 'B', 'C', 'D', 'E', 'F'] for vals in PARAMs.values() for v in vals] + ): + raise ValueError("Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'") + log.debug("Argo index searching for profile QC: %s ..." % PARAMs) + return PARAMs + + def namer(PARAMs, logical): + return {"PROFQC": (PARAMs, logical)} + + def composer(PARAMs, logical): + filt = [] + + for param in PARAMs: + qcflags = PARAMs[param] + filt.append(self._obj.index[f"profile_{param.lower()}_qc"].isin(qcflags)) + + return self._obj._reduce_a_filter_list(filt, op=logical) + + PARAMs = checker(PARAMs) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(PARAMs, logical) + if not composed: + self._obj.search_type = namer(PARAMs, logical) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(PARAMs, logical)) + return search_filter From a3e91bc7fa8f07399f8ded70b4d2bcdab265e6f8 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 19 May 2026 12:18:46 +0200 Subject: [PATCH 5/8] Update search_engine.py --- argopy/stores/index/implementations/pandas/search_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 3b2a046c0..34f90ff2a 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -48,7 +48,7 @@ def compute_params(param: str, obj): class SearchEngine(ArgoIndexSearchEngine): @search_s3 - def wmo(self, WMOs, nrows=None, composed=False) -> indexstore: + def wmo(self, WMOs, nrows=None, composed=False): def checker(WMOs): WMOs = check_wmo(WMOs) # Check and return a valid list of WMOs log.debug( @@ -79,7 +79,7 @@ def composer(obj, WMOs): return search_filter @search_s3 - def cyc(self, CYCs, nrows=None, composed=False) -> indexstore: + def cyc(self, CYCs, nrows=None, composed=False): def checker(CYCs): if self._obj.convention in ["ar_index_global_meta"]: raise InvalidDatasetStructure( From 63013adcdef2c8334855600a5cf33c3b1a356f92 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 19 May 2026 12:40:40 +0200 Subject: [PATCH 6/8] Update extensions.py --- argopy/stores/index/extensions.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/argopy/stores/index/extensions.py b/argopy/stores/index/extensions.py index a79a5bbee..7140db9d3 100644 --- a/argopy/stores/index/extensions.py +++ b/argopy/stores/index/extensions.py @@ -456,7 +456,7 @@ def parameter_data_mode(self): def profiler_type(self): """Search index for profiler types - The list of valid types is given by IDs of `Argo reference table 8 `_. + The list of valid types is given in `Argo reference table 8 / ARGO_WMO_INST_TYPE `_. Parameters ---------- @@ -479,8 +479,8 @@ def profiler_type(self): .. code-block:: python :caption: List valid types - from argopy import ArgoNVSReferenceTables - valid_types = ArgoNVSReferenceTables().tbl(8)['altLabel'] + from argopy import ArgoReferenceTable + valid_types : list[str] = ArgoReferenceTable('ARGO_WMO_INST_TYPE').keys() See Also -------- @@ -491,12 +491,12 @@ def profiler_type(self): def profiler_label(self, profiler_label: str, nrows=None, composed=False): """Search index for profiler types with a given string in their long name - Will search for string occurrences in the preferred label of `Argo reference table 8 `_. + Will search for string occurrences in the preferred label of `Argo reference table 8/ARGO_WMO_INST_TYPE `_. Parameters ---------- profiler_label: str, list(str) - The string (not exact) to be found in profiler preferred labels. + The string (not necessarily exact) to be found in profiler preferred labels. Returns ------- @@ -514,8 +514,9 @@ def profiler_label(self, profiler_label: str, nrows=None, composed=False): .. code-block:: python :caption: List valid labels - from argopy import ArgoNVSReferenceTables - valid_labels = ArgoNVSReferenceTables().tbl(8)['prefLabel'] + from argopy import ArgoReferenceTable + df = ar.ArgoReferenceTable('ARGO_WMO_INST_TYPE').to_dataframe() + valid_labels : list[str] = list(df['long_name'].to_dict().values()) See Also -------- @@ -587,7 +588,7 @@ def profile_qc(self, param): def institution_code(self, institution_code, nrows=None, composed=False): """Search index for institution codes - The list of valid codes is given by IDs of `Argo reference table 4 `_. + The list of valid codes is given in `Argo reference table 4/DATA_CENTRE_CODES `_. Parameters ---------- @@ -611,8 +612,8 @@ def institution_code(self, institution_code, nrows=None, composed=False): .. code-block:: python :caption: List valid codes - from argopy import ArgoNVSReferenceTables - valid_codes = ArgoNVSReferenceTables().tbl(4)['altLabel'] + from argopy import ArgoReferenceTable + valid_codes : list[str] = ArgoReferenceTable('DATA_CENTRE_CODES').keys() See Also -------- @@ -624,12 +625,12 @@ def institution_code(self, institution_code, nrows=None, composed=False): def institution_name(self, institution_name: str, nrows=None, composed=False): """Search index for institutions with a given string in their long name - Will search for string occurrences in the preferred label of `Argo reference table 4 `_. + Will search for string occurrences in the preferred label of `Argo reference table 4/DATA_CENTRE_CODES `_. Parameters ---------- institution_name: str, list(str) - The string (not exact) to be found in institution preferred labels. + The string (not necessarily exact) to be found in institution preferred labels. Returns ------- @@ -648,8 +649,9 @@ def institution_name(self, institution_name: str, nrows=None, composed=False): .. code-block:: python :caption: List valid names - from argopy import ArgoNVSReferenceTables - valid_names = ArgoNVSReferenceTables().tbl(4)['prefLabel'] + from argopy import ArgoReferenceTable + df = ar.ArgoReferenceTable('DATA_CENTRE_CODES').to_dataframe() + valid_names : list[str] = list(df['long_name'].to_dict().values()) See Also -------- From 19f5850405b3a0e8a15900bf76ed62092960f40d Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 19 May 2026 13:52:46 +0200 Subject: [PATCH 7/8] Update index.py --- .../index/implementations/pyarrow/index.py | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/argopy/stores/index/implementations/pyarrow/index.py b/argopy/stores/index/implementations/pyarrow/index.py index b157c21f9..896c7bb29 100644 --- a/argopy/stores/index/implementations/pyarrow/index.py +++ b/argopy/stores/index/implementations/pyarrow/index.py @@ -507,11 +507,11 @@ def to_indexfile(self, file): str """ - # def convert_a_date(row): - # try: - # return row.strftime("%Y%m%d%H%M%S") - # except Exception: - # return "" + def convert_a_date(row): + try: + return row.strftime("%Y%m%d%H%M%S") + except Exception: + return "" s = self.search @@ -519,31 +519,31 @@ def to_indexfile(self, file): if "longitude_360" in s.column_names: s = s.drop_columns("longitude_360") - # if self.convention not in [ - # "ar_index_global_meta", - # ]: - # new_date = pa.array(self.search["date"].to_pandas().apply(convert_a_date)) - # - # new_date_update = pa.array( - # self.search["date_update"].to_pandas().apply(convert_a_date) - # ) - # - # if self.convention not in [ - # "ar_index_global_meta", - # ]: - # s = s.set_column(1, "date", new_date) - # - # if self.convention in ["ar_index_global_prof", "argo_profile_detailled_index"]: - # s = s.set_column(7, "date_update", new_date_update) - # elif self.convention in [ - # "argo_bio-profile_index", - # "argo_synthetic-profile_index", - # ]: - # s = s.set_column(9, "date_update", new_date_update) - # elif self.convention in ["argo_aux-profile_index"]: - # s = s.set_column(8, "date_update", new_date_update) - # elif self.convention in ["ar_index_global_meta"]: - # s = s.set_column(3, "date_update", new_date_update) + if self.convention not in [ + "ar_index_global_meta", + ]: + new_date = pa.array(self.search["date"].to_pandas().apply(convert_a_date)) + + new_date_update = pa.array( + self.search["date_update"].to_pandas().apply(convert_a_date) + ) + + if self.convention not in [ + "ar_index_global_meta", + ]: + s = s.set_column(1, "date", new_date) + + if self.convention in ["ar_index_global_prof", "argo_profile_detailled_index"]: + s = s.set_column(7, "date_update", new_date_update) + elif self.convention in [ + "argo_bio-profile_index", + "argo_synthetic-profile_index", + ]: + s = s.set_column(9, "date_update", new_date_update) + elif self.convention in ["argo_aux-profile_index"]: + s = s.set_column(8, "date_update", new_date_update) + elif self.convention in ["ar_index_global_meta"]: + s = s.set_column(3, "date_update", new_date_update) write_options = csv.WriteOptions( delimiter=",", include_header=False, quoting_style="none" From ee162354a814d33b60911ef307c33712713ea991 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Sun, 31 May 2026 21:13:25 +0200 Subject: [PATCH 8/8] Add support for psal adjustment and n-levels search queries --- argopy/stores/index/extensions.py | 18 ++ .../implementations/pandas/search_engine.py | 58 +++++- .../implementations/pyarrow/search_engine.py | 165 +++++++++++++++--- argopy/stores/index/spec.py | 19 ++ 4 files changed, 237 insertions(+), 23 deletions(-) diff --git a/argopy/stores/index/extensions.py b/argopy/stores/index/extensions.py index 7140db9d3..5cf13e212 100644 --- a/argopy/stores/index/extensions.py +++ b/argopy/stores/index/extensions.py @@ -721,6 +721,24 @@ def dac(self, dac, nrows=None, composed=False): """ raise NotImplementedError("Not implemented") + @abstractmethod + def psal_adj(self): + """Search (detailed) index for salinity adjustment values + + Defined for for delayed mode or adjusted mode profiles only. + + - Mean of psal_adjusted – psal on the deepest 500 meters with good psal_adjusted_qc (equal to 1) + - Standard deviation of psal_adjusted – psal on the deepest 500 meters with good psal_adjusted_qc (equal to 1) + + """ + raise NotImplementedError("Not implemented") + + @abstractmethod + def n_levels(self): + """Search index profiles using the maximum number of pressure levels contained in a profile + """ + raise NotImplementedError("Not implemented") + def compose(self, query: dict, nrows=None): """Compose query with multiple search methods diff --git a/argopy/stores/index/implementations/pandas/search_engine.py b/argopy/stores/index/implementations/pandas/search_engine.py index 34f90ff2a..f4c70f077 100644 --- a/argopy/stores/index/implementations/pandas/search_engine.py +++ b/argopy/stores/index/implementations/pandas/search_engine.py @@ -2,7 +2,7 @@ import logging import pandas as pd import numpy as np -from typing import List +from typing import List, Literal, Optional from functools import lru_cache from argopy.options import OPTIONS @@ -603,3 +603,59 @@ def composer(PARAMs, logical): else: self._obj.search_type.update(namer(PARAMs, logical)) return search_filter + + def psal_adj( + self, + where: Literal["mean", "dev"] = "mean", + ge: Optional[float] = 0.0, + le: Optional[float] = None, + nrows=None, + composed=False, + ): + def checker(where: str, ge: Optional[float], le: Optional[float])-> [str, Optional[float], Optional[float]]: + if where.lower() not in ['mean', 'dev']: + raise ValueError(f"'{where}': The 'where' argument must be 'mean' or 'dev'.") + if "ad_psal_adjustment_mean" not in self._obj.convention_columns: + raise InvalidDatasetStructure( + "Cannot search for salinity adjustment mean in this index)" + ) + if "ad_psal_adjustment_deviation" not in self._obj.convention_columns: + raise InvalidDatasetStructure( + "Cannot search for salinity adjustment deviation in this index)" + ) + + bounds = [where.lower(), ge, le] + + if bounds[0] == 'dev' and bounds[2] is not None and bounds[2] < 0: + raise ValueError(f"Deviation lower limit must be zero or positive") + + return bounds + + def namer(bounds): + return {f"PSAL_ADJ_{bounds[0].upper()}": bounds[1:]} + + def composer(obj, bounds): + filt = [] + pname: str = ( + "ad_psal_adjustment_mean" + if bounds[0] == "mean" + else "ad_psal_adjustment_deviation" + ) + if bounds[1] is not None: + filt.append(obj.index[pname].ge(bounds[1])) + if bounds[2] is not None: + filt.append(obj.index[pname].le(bounds[2])) + + return obj._reduce_a_filter_list(filt, op="and") + + bounds = checker(where, ge, le) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(self._obj, bounds) + if not composed: + self._obj.search_type = namer(bounds) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(bounds)) + return search_filter diff --git a/argopy/stores/index/implementations/pyarrow/search_engine.py b/argopy/stores/index/implementations/pyarrow/search_engine.py index 9d8f28826..53894ff1e 100644 --- a/argopy/stores/index/implementations/pyarrow/search_engine.py +++ b/argopy/stores/index/implementations/pyarrow/search_engine.py @@ -1,7 +1,7 @@ import logging import pandas as pd import numpy as np -from typing import List +from typing import List, Literal, Optional, Tuple from functools import lru_cache @@ -169,8 +169,6 @@ def checker(BOX, **kwargs): log.debug("Argo index searching for date in BOX=%s ..." % BOX) return ("date", BOX) # Return key to use for time axis - key, BOX = checker(BOX, **kwargs) - def namer(BOX): return {"DATE": BOX[4:6]} @@ -190,6 +188,7 @@ def composer(BOX, key): ) return self._obj._reduce_a_filter_list(filt, op="and") + key, BOX = checker(BOX, **kwargs) self._obj.load(nrows=self._obj._nrows_index) search_filter = composer(BOX, key) if not composed: @@ -212,8 +211,6 @@ def checker(BOX, **kwargs): log.debug("Argo index searching for latitude in BOX=%s ..." % BOX) return BOX - BOX = checker(BOX, **kwargs) - def namer(BOX): return {"LAT": BOX[2:4]} @@ -223,6 +220,7 @@ def composer(BOX): filt.append(pa.compute.less_equal(self._obj.index["latitude"], BOX[3])) return self._obj._reduce_a_filter_list(filt, op="and") + BOX = checker(BOX, **kwargs) self._obj.load(nrows=self._obj._nrows_index) search_filter = composer(BOX) if not composed: @@ -245,25 +243,40 @@ def checker(BOX, **kwargs): log.debug("Argo index searching for longitude in BOX=%s ..." % BOX) return BOX - BOX = checker(BOX, **kwargs) - def namer(BOX): return {"LON": BOX[0:2]} def composer(BOX): filt = [] - if OPTIONS['longitude_convention'] == '360': + if OPTIONS["longitude_convention"] == "360": if BOX[0] is not None: - filt.append(pc.greater_equal(self._obj.index["longitude_360"], conv_lon(BOX[0], '360'))) + filt.append( + pc.greater_equal( + self._obj.index["longitude_360"], conv_lon(BOX[0], "360") + ) + ) if BOX[1] is not None: - filt.append(pc.less_equal(self._obj.index["longitude_360"], conv_lon(BOX[1], '360'))) - elif OPTIONS['longitude_convention'] == '180': + filt.append( + pc.less_equal( + self._obj.index["longitude_360"], conv_lon(BOX[1], "360") + ) + ) + elif OPTIONS["longitude_convention"] == "180": if BOX[0] is not None: - filt.append(pc.greater_equal(self._obj.index["longitude"], conv_lon(BOX[0], '180'))) + filt.append( + pc.greater_equal( + self._obj.index["longitude"], conv_lon(BOX[0], "180") + ) + ) if BOX[1] is not None: - filt.append(pc.less_equal(self._obj.index["longitude"], conv_lon(BOX[1], '180'))) + filt.append( + pc.less_equal( + self._obj.index["longitude"], conv_lon(BOX[1], "180") + ) + ) return self._obj._reduce_a_filter_list(filt, op="and") + BOX = checker(BOX, **kwargs) self._obj.load(nrows=self._obj._nrows_index) search_filter = composer(BOX) if not composed: @@ -617,15 +630,21 @@ def composer(DACs): def profile_qc(self, PARAMs: dict, logical="and", nrows=None, composed=False): def checker(PARAMs): if "profile_temp_qc" not in self._obj.convention_columns: - raise InvalidDatasetStructure("Cannot search for profile QC in this index)") + raise InvalidDatasetStructure( + "Cannot search for profile QC in this index)" + ) # Validate PARAMs - [ - PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs - ] + [PARAMs.update({p: to_list(PARAMs[p])}) for p in PARAMs] if not np.all( - [v in ['', ' ', '1', 'A', 'B', 'C', 'D', 'E', 'F'] for vals in PARAMs.values() for v in vals] + [ + v in ["", " ", "1", "A", "B", "C", "D", "E", "F"] + for vals in PARAMs.values() + for v in vals + ] ): - raise ValueError("Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'") + raise ValueError( + "Profile QC must be a value in '', 'A', 'B', 'C', 'D', 'E', 'F'" + ) log.debug("Argo index searching for profile QC: %s ..." % PARAMs) return PARAMs @@ -637,9 +656,12 @@ def composer(PARAMs, logical): for param in PARAMs: qcflags = PARAMs[param] - filt.append(pa.compute.is_in( - self._obj.index[f"profile_{param.lower()}_qc"], pa.array(qcflags) - )) + filt.append( + pa.compute.is_in( + self._obj.index[f"profile_{param.lower()}_qc"], + pa.array(qcflags), + ) + ) return self._obj._reduce_a_filter_list(filt, op=logical) @@ -654,3 +676,102 @@ def composer(PARAMs, logical): else: self._obj.search_type.update(namer(PARAMs, logical)) return search_filter + + def psal_adj( + self, + where: Literal["mean", "std"] = "mean", + ge: Optional[float] = 0.0, + le: Optional[float] = None, + nrows=None, + composed=False, + ): + def checker(where: str, ge: Optional[float], le: Optional[float])-> [str, Optional[float], Optional[float]]: + if where.lower() not in ['mean', 'std']: + raise ValueError(f"'{where}': The 'where' argument must be 'mean' or 'std'.") + if "ad_psal_adjustment_mean" not in self._obj.convention_columns: + raise InvalidDatasetStructure( + "Cannot search for salinity adjustment mean in this index" + ) + if "ad_psal_adjustment_deviation" not in self._obj.convention_columns: + raise InvalidDatasetStructure( + "Cannot search for salinity adjustment standard deviation in this index" + ) + + bounds = [where.lower(), ge, le] + + if bounds[0] == 'std' and bounds[2] is not None and bounds[2] < 0: + raise ValueError(f"Standard deviation lower limit must be zero or positive") + + return bounds + + def namer(bounds): + return {f"PSAL_ADJ_{bounds[0].upper()}": bounds[1:]} + + def composer(obj, bounds): + filt = [] + pname: str = ( + "ad_psal_adjustment_mean" + if bounds[0] == "mean" + else "ad_psal_adjustment_deviation" + ) + if bounds[1] is not None: + filt.append(pc.greater_equal(obj.index[pname], bounds[1])) + if bounds[2] is not None: + filt.append(pc.less_equal(obj.index[pname], bounds[2])) + return obj._reduce_a_filter_list(filt, op="and") + + bounds = checker(where, ge, le) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(self._obj, bounds) + if not composed: + self._obj.search_type = namer(bounds) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(bounds)) + return search_filter + + def n_levels( + self, + ge: Optional[int] = None, + le: Optional[int] = None, + nrows=None, + composed=False, + ): + def checker(ge: Optional[int], le: Optional[int])-> [Optional[int], Optional[int]]: + if "n_levels" not in self._obj.convention_columns: + raise InvalidDatasetStructure( + "Cannot search for number of levels in this index)" + ) + bounds = [ge, le] + if bounds[0] is not None and bounds[0] <= 0: + raise ValueError(f"The minimum number of levels 'ge' must be positive, {bounds[0]} provided") + if bounds[1] is not None and bounds[1] <= 0: + raise ValueError(f"The maximum number of levels 'le' must be positive, {bounds[1]} provided") + if bounds[0] is not None and bounds[1] is not None and bounds[0] > bounds[1]: + raise ValueError(f"Upper bound le={bounds[1]} must be small than the lower bound ge={bounds[0]}") + return bounds + + def namer(bounds): + return {f"NLEVELS": bounds} + + def composer(obj, bounds): + filt = [] + if bounds[0] is not None: + filt.append(pc.greater_equal(obj.index['n_levels'], bounds[0])) + if bounds[1] is not None: + filt.append(pc.less_equal(obj.index['n_levels'], bounds[1])) + return obj._reduce_a_filter_list(filt, op="and") + + bounds = checker(ge, le) + self._obj.load(nrows=self._obj._nrows_index) + search_filter = composer(self._obj, bounds) + if not composed: + self._obj.search_type = namer(bounds) + self._obj.search_filter = search_filter + self._obj.run(nrows=nrows) + return self._obj + else: + self._obj.search_type.update(namer(bounds)) + return search_filter diff --git a/argopy/stores/index/spec.py b/argopy/stores/index/spec.py index fc1ca7926..8e9880d0d 100644 --- a/argopy/stores/index/spec.py +++ b/argopy/stores/index/spec.py @@ -440,6 +440,25 @@ def cname(self) -> str: ["%s_%s" % (p, "".join(PROFQC[p])) for p in PROFQC] ) + elif "PSAL_ADJ_MEAN" == key: + log.debug(self.search_type) + ADJ = self.search_type["PSAL_ADJ_MEAN"] + cname = [] + if ADJ[0] is not None: + cname.append(f"MEAN_PSAL_ADJ>={ADJ[0]}") + if ADJ[1] is not None: + cname.append(f"MEAN_PSAL_ADJ<={ADJ[1]}") + cname = "_and_".join(cname) + + elif key == "NLEVELS": + N = self.search_type[key] + if N[0] is not None and N[1] is not None: + cname = f"n={N[0]}/{N[1]}" + elif N[0] is not None and N[1] is None: + cname = f"n>={N[0]}" + elif N[1] is not None and N[0] is None: + cname = f"n<={N[1]}" + C.append(cname) return "_and_".join(C)