diff --git a/src/hubmap_translator.py b/src/hubmap_translator.py index d4727daf..2710393d 100644 --- a/src/hubmap_translator.py +++ b/src/hubmap_translator.py @@ -632,7 +632,7 @@ def _exec_reindex_entity_to_index_group_by_id(self, entity_id: str(32), index_gr logger.info(f"Finished executing _exec_reindex_entity_to_index_group_by_id()") - def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:str): + def _transform_and_write_entity_to_index_group(self, entity: dict, index_group: str, reindex_info: dict = None): logger.info(f"Start executing direct '{index_group}' updates for" f" entity['uuid']={entity['uuid']}," f" entity['entity_type']={entity['entity_type']}") @@ -642,30 +642,26 @@ def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:st # _generate_public_doc() needs _generate_doc() to make changes first. So make # a copy of the entity for these methods, leaving the original argument intact doc_entity=copy.deepcopy(entity) - private_doc = self._generate_doc( entity=doc_entity - , return_type='json' - , index_group=index_group) + private_doc = self._generate_doc(entity=doc_entity + , return_type='json' + , index_group=index_group + , reindex_info=reindex_info) if self.is_public(entity): - public_doc = self._generate_public_doc( entity=doc_entity - , index_group=index_group) + public_doc = self._generate_public_doc(entity=doc_entity + , index_group=index_group) del doc_entity except Exception as e: msg = f"Exception document generation" \ - f" for uuid: {entity['uuid']}, entity_type: {entity['entity_type']}" \ - f" for '{index_group}' reindex caused \'{str(e)}\'" - # Log the full stack trace, prepend a line with our message. But continue on - # rather than raise the Exception. + f" for uuid: {entity['uuid']}, entity_type: {entity['entity_type']}" \ + f" for '{index_group}' reindex caused \'{str(e)}\'" logger.exception(msg) - - if 'private_doc' not in locals() or private_doc is None: - logger.error( f"For {entity['entity_type']} {entity['uuid']}," - f" failed to generate document for consortium indices.") - + if 'private_doc' not in locals() or private_doc is None: + logger.error(f"For {entity['entity_type']} {entity['uuid']}," + f" failed to generate document for consortium indices.") docs_to_write_dict = { self.index_group_es_indices[index_group]['private']: None, self.index_group_es_indices[index_group]['public']: None } - # Check to see if the index_group has a transformer, default to None if not found transformer = self.TRANSFORMERS.get(index_group, None) if transformer is None: logger.info(f"Unable to find '{index_group}' transformer, indexing documents untransformed.") @@ -678,24 +674,23 @@ def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:st docs_to_write_dict[self.index_group_es_indices[index_group]['private']] = json.dumps(private_transformed) if 'public_doc' in locals() and public_doc is not None: public_transformed = transformer.transform(json.loads(public_doc), - self.transformation_resources) + self.transformation_resources) docs_to_write_dict[self.index_group_es_indices[index_group]['public']] = json.dumps(public_transformed) - for index_name in docs_to_write_dict.keys(): if docs_to_write_dict[index_name] is None: continue self.indexer.index(entity_id=entity['uuid'] - , document=docs_to_write_dict[index_name] - , index_name=index_name - , reindex=True) - logger.info(f"Finished executing indexer.index() during direct '{index_group}' reindexing with" \ - f" entity['uuid']={entity['uuid']}," \ - f" entity['entity_type']={entity['entity_type']}," \ + , document=docs_to_write_dict[index_name] + , index_name=index_name + , reindex=True) + logger.info(f"Finished executing indexer.index() during direct '{index_group}' reindexing with" + f" entity['uuid']={entity['uuid']}," + f" entity['entity_type']={entity['entity_type']}," f" index_name={index_name}.") - logger.info(f"Finished direct '{index_group}' updates for" f" entity['uuid']={entity['uuid']}," f" entity['entity_type']={entity['entity_type']}") + def enqueue_reindex(self, entity_id, reindex_queue, priority): try: @@ -729,7 +724,7 @@ def enqueue_reindex(self, entity_id, reindex_queue, priority): collection_associations.append(dataset_id) if 'associated_publication' in collection and collection['associated_publication']: logger.info(f"Enqueueing associated_publication for {entity['entity_type']} {entity_id}") - collection_associations.append(collection['associated_publication']) + collection_associations.append(collection['associated_publication'].get('uuid')) elif entity['entity_type'] == 'Upload': if 'datasets' in entity: @@ -840,10 +835,10 @@ def reindex_entity_queued(self, entity_id): logger.info(f"Reindexing {entity['entity_type']} of uuid: {entity_id}") if entity['entity_type'] in ['Collection', 'Epicollection']: - self.translate_collection(entity_id, reindex=True) + self.translate_collection(entity, reindex=True) elif entity['entity_type'] == 'Upload': - self.translate_upload(entity_id, reindex=True) + self.translate_upload(entity, reindex=True) else: self._call_indexer(entity=entity, delete_existing_doc_first=True) @@ -1238,14 +1233,14 @@ def delete(self, entity_id): # When indexing, Upload WILL NEVER BE PUBLIC - def translate_upload(self, entity_id, reindex=False): + def translate_upload(self, entity, reindex=False): try: - logger.info(f"Start executing translate_upload() for {entity_id}") + logger.info(f"Start executing translate_upload() for {entity.get('uuid')}") default_private_index = self.INDICES['indices'][self.DEFAULT_INDEX_WITHOUT_PREFIX]['private'] # Retrieve the upload entity details - upload = self.call_entity_api(entity_id=entity_id, endpoint_base='documents') + upload = entity self._add_datasets_to_entity( entity=upload , index_group=self.DEFAULT_INDEX_WITHOUT_PREFIX) @@ -1263,8 +1258,8 @@ def translate_upload(self, entity_id, reindex=False): except Exception as e: logger.error(e) - def translate_collection(self, entity_id, reindex=False): - logger.info(f"Start executing translate_collection() for {entity_id}") + def translate_collection(self, entity, reindex=False): + logger.info(f"Start executing translate_collection() for {entity.get('uuid')}") # The entity-api returns public collection with a list of connected public/published datasets, for either # - a valid token but not in HuBMAP-Read group or @@ -1272,8 +1267,7 @@ def translate_collection(self, entity_id, reindex=False): # Here we do NOT send over the token try: for index_group in self.indices.keys(): - collection = self.get_collection_doc(entity_id=entity_id) - + collection = entity self._add_datasets_to_entity( entity=collection , index_group=index_group) self._entity_keys_rename(collection) @@ -1523,20 +1517,41 @@ def _index_doc_directly_to_es_index(self, entity:dict, document:json, es_index:s # ingest_metadata.metadata sub fields with empty string values from previous call def _call_indexer(self, entity, delete_existing_doc_first=False): logger.info(f"Start executing _call_indexer() on uuid: {entity['uuid']}, entity_type: {entity['entity_type']}") - try: - # Generate and write a document for the entity to each index group loaded from the configuration file. + reindex_info = None + if entity['entity_type'] not in ['Collection', 'Epicollection', 'Upload']: + reindex_info = self._fetch_reindex_info(entity['uuid']) + for index_group in self.indices.keys(): self._transform_and_write_entity_to_index_group(entity=entity - , index_group=index_group) + , index_group=index_group + , reindex_info=reindex_info) logger.info(f"Finished executing _call_indexer() on uuid: {entity['uuid']}, entity_type: {entity['entity_type']}") except Exception as e: msg = f"Encountered exception e={str(e)}" \ - f" executing _call_indexer() with" \ - f" uuid: {entity['uuid']}, entity_type: {entity['entity_type']}" - # Log the full stack trace, prepend a line with our message + f" executing _call_indexer() with" \ + f" uuid: {entity['uuid']}, entity_type: {entity['entity_type']}" logger.exception(msg) + + def _fetch_reindex_info(self, entity_uuid): + url = f"{self.entity_api_url}/entities/{entity_uuid}/reindex-info?" + response = requests.get(url, headers=self.request_headers, verify=False) + if response.status_code == 200: + return response.json() + elif response.status_code == 303: + s3_url = response.text + logger.info(f"reindex-info for {entity_uuid} redirected to S3: {s3_url}") + s3_response = requests.get(s3_url, verify=False) + if s3_response.status_code == 200: + return s3_response.json() + else: + logger.error(f"Failed to fetch reindex-info from S3 for {entity_uuid}: {s3_response.status_code}") + return None + else: + logger.warning(f"Failed to fetch reindex-info for {entity_uuid}, falling back to legacy path") + return None + # The added fields specified in `entity_properties_list` should not be added # to themselves as sub fields # The `except_properties_list` is a subset of entity_properties_list @@ -1570,53 +1585,56 @@ def exclude_added_calculated_fields(self, entity_data, except_properties_list = logger.info("Finished executing exclude_added_calculated_fields()") # Used for Upload and Collection index - def _add_datasets_to_entity(self, entity:dict, index_group:str): + def _add_datasets_to_entity(self, entity: dict, index_group: str): logger.info("Start executing _add_datasets_to_entity()") - datasets = [] if 'datasets' in entity: - for dataset in entity['datasets']: - # Retrieve the entity details + batch_docs = None + try: + url = f"{self.entity_api_url}/entities/{entity['uuid']}/dataset-documents" + response = requests.get(url, headers=self.request_headers, verify=False) + if response.status_code == 200: + batch_docs = response.json() + elif response.status_code == 303: + s3_url = response.text + logger.info(f"dataset-documents for {entity['uuid']} redirected to S3: {s3_url}") + s3_response = requests.get(s3_url, verify=False) + if s3_response.status_code == 200: + batch_docs = s3_response.json() + else: + logger.error(f"Failed to fetch dataset-documents from S3: {s3_response.status_code}") + else: + logger.error(f"Failed to fetch dataset-documents for {entity['uuid']}: {response.status_code}, falling back to individual calls") + except Exception as e: + logger.error(f"Exception fetching dataset-documents for {entity['uuid']}: {e}, falling back to individual calls") + + for dataset_stub in entity['datasets']: try: - dataset = self.call_entity_api(dataset['uuid'], 'documents') - # Remove large fields that cause poor performance and are not used, both for current - # implementation and ingest_metadata reorganization is coordinated for Production release, - # will also remove 'files' here, and delete the call to exclude_added_top_level_properties() below. + if batch_docs and dataset_stub['uuid'] in batch_docs: + dataset = batch_docs[dataset_stub['uuid']] + else: + dataset = self.call_entity_api(dataset_stub['uuid'], 'documents') for large_field_name in NESTED_EXCLUDED_ES_FIELDS_FOR_COLLECTIONS_AND_UPLOADS: if large_field_name in dataset: dataset.pop(large_field_name) except Exception as e: logger.exception(e) - logger.error( f"Failed to retrieve dataset {dataset['uuid']}" - f" via entity-api while executing" - f" _add_datasets_to_entity(). Skip and continue to next one") - - # This can happen when the dataset is in neo4j but the actual uuid is not found in MySQL - # or something else is wrong with entity-api and it can't return the dataset info - # In this case, we'll skip over the current iteration, and continue with the next one - # Otherwise, null will be added to the resulting datasets list and break portal-ui rendering - 5/3/2023 Zhou + logger.error(f"Failed to retrieve dataset {dataset_stub['uuid']}" + f" via entity-api while executing" + f" _add_datasets_to_entity(). Skip and continue to next one") continue - try: - dataset_doc = self._generate_doc( entity=dataset - , return_type='dict' - , index_group=index_group) + self._entity_keys_rename(dataset) + remove_specific_key_entry(dataset, "other_metadata") + self.add_calculated_fields(dataset) except Exception as e: logger.exception(e) - logger.error( f"Failed to execute _generate_doc() on dataset {dataset['uuid']}" - f" while executing _add_datasets_to_entity()." - f" Skip and continue to next one") - - # This can happen when the dataset itself is good but something failed to generate the doc - # E.g., one of the descendants of this dataset exists in neo4j but no record in uuid MySQL - # In this case, we'll skip over the current iteration, and continue with the next one - # Otherwise, no document is generated, null will be added to the resuting datasets list and break portal-ui rendering - 5/3/2023 Zhou + logger.error(f"Failed to process dataset {dataset_stub['uuid']}" + f" while executing _add_datasets_to_entity()." + f" Skip and continue to next one") continue - self.exclude_added_top_level_properties(dataset_doc) - datasets.append(dataset_doc) - + datasets.append(dataset) entity['datasets'] = datasets - logger.info("Finished executing _add_datasets_to_entity()") # Modify any key names specified to change on the entity @@ -1716,7 +1734,7 @@ def _relatives_for_index_group(self, relative_ids:list, index_group:str): # Note: this entity dict input (if Dataset) has already handled ingest_metadata.files (with empty string or missing) # and ingest_metadata.metadata sub fields with empty string values from previous call - def _generate_doc(self, entity, return_type, index_group:str): + def _generate_doc(self, entity, return_type, index_group: str, reindex_info: dict = None): try: logger.info(f"Start executing _generate_doc() for {entity['entity_type']}" f" of uuid: {entity['uuid']}" @@ -1724,192 +1742,156 @@ def _generate_doc(self, entity, return_type, index_group:str): entity_id = entity['uuid'] - # Full ancestors may not be needed to populate a field in an ES index of - # an index group, but fill for now to calculate other fields e.g. origin_samples - ancestors = [] - # The ES document top-level "donor" field will be the first ancestor of - # this entity with an entity_type of 'Donor'. - donor = None - if entity['entity_type'] != 'Upload': - # Do not call /ancestors/ directly to avoid performance/timeout issue - # Get back a list of ancestor uuids first - ancestor_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='ancestors' - , endpoint_suffix=None - , url_property='uuid') - # Fill ancestors with "full" entities, both for use in calculating 'origin_samples' below and - # to determine which ancestor populates 'donor'. But after all calculations, cut 'ancestors' - # back to the specific needs for documents in the index group - for ancestor_uuid in ancestor_ids: - ancestor_dict = self.call_entity_api(entity_id=ancestor_uuid - , endpoint_base='documents') - ancestors.append(ancestor_dict) - - # If the current ancestor is the first Donor encountered, save it to - # populate the ES document "donor" field for this entity. - if ancestor_dict['entity_type'] == 'Donor' and not donor: - donor = copy.deepcopy(ancestor_dict) - - descendant_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='descendants' - , endpoint_suffix=None - , url_property='uuid') - descendants = self._relatives_for_index_group( relative_ids=descendant_ids - , index_group=index_group) - immediate_ancestor_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='parents' - , endpoint_suffix=None - , url_property='uuid') - - immediate_descendant_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='children' + if entity['entity_type'] != 'Upload': + ig_doc_fields = INDEX_GROUP_PORTAL_DOC_FIELDS if index_group == 'portal' else INDEX_GROUP_ENTITIES_DOC_FIELDS + + def filter_fields(entity_list): + result = [] + for e in entity_list: + filtered = {k: e[k] for k in ig_doc_fields.keys() if k in e} + if filtered: + result.append(filtered) + return result + + if reindex_info is not None: + ancestors = reindex_info.get('ancestors', []) + descendants_full = reindex_info.get('descendants', []) + immediate_ancestors_full = reindex_info.get('immediate_ancestors', []) + immediate_descendants_full = reindex_info.get('immediate_descendants', []) + ancestor_ids = [e['uuid'] for e in ancestors] + descendant_ids = [e['uuid'] for e in descendants_full] + immediate_ancestor_ids = [e['uuid'] for e in immediate_ancestors_full] + immediate_descendant_ids = [e['uuid'] for e in immediate_descendants_full] + donor = copy.deepcopy(reindex_info['donor']) if reindex_info.get('donor') else None + origin_samples = reindex_info.get('origin_samples', []) + source_samples = reindex_info.get('source_samples', []) + else: + # Legacy path - individual calls, preserved for backwards compatibility + ancestor_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='ancestors' + , endpoint_suffix=None + , url_property='uuid') + ancestors = [] + donor = None + for ancestor_uuid in ancestor_ids: + ancestor_dict = self.call_entity_api(entity_id=ancestor_uuid + , endpoint_base='documents') + ancestors.append(ancestor_dict) + if ancestor_dict['entity_type'] == 'Donor' and not donor: + donor = copy.deepcopy(ancestor_dict) + descendant_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='descendants' + , endpoint_suffix=None + , url_property='uuid') + descendants_full = self._relatives_for_index_group(relative_ids=descendant_ids + , index_group=index_group) + immediate_ancestor_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='parents' , endpoint_suffix=None , url_property='uuid') + immediate_descendant_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='children' + , endpoint_suffix=None + , url_property='uuid') + immediate_ancestors_full = self._relatives_for_index_group(relative_ids=immediate_ancestor_ids + , index_group=index_group) + immediate_descendants_full = self._relatives_for_index_group(relative_ids=immediate_descendant_ids + , index_group=index_group) + origin_samples = [] + if (('sample_category' in entity) and + (entity['sample_category'].lower() == 'organ') and + ('organ' in entity) and (entity['organ'].strip() != '')): + origin_samples = [copy.deepcopy(entity)] + else: + for ancestor in ancestors: + if (('sample_category' in ancestor) and + (ancestor['sample_category'].lower() == 'organ') and + ('organ' in ancestor) and (ancestor['organ'].strip() != '')): + origin_samples.append(ancestor) + source_samples = [] + if entity['entity_type'] in ['Dataset', 'Publication']: + source_samples = None + e = entity + while source_samples is None: + parent_uuids = self.call_entity_api(entity_id=e['uuid'] + , endpoint_base='parents' + , endpoint_suffix=None + , url_property='uuid') + parents = [] + for parent_uuid in parent_uuids: + parent_entity_doc = self.call_entity_api(entity_id=parent_uuid + , endpoint_base='documents') + parents.append(parent_entity_doc) + try: + if parents[0]['entity_type'] == 'Sample': + source_samples = parents + e = parents[0] + except IndexError: + source_samples = [] - # Add new properties to entity for documents in all indices - entity['descendants'] = descendants entity['ancestor_ids'] = ancestor_ids entity['descendant_ids'] = descendant_ids entity['immediate_ancestor_ids'] = immediate_ancestor_ids entity['immediate_descendant_ids'] = immediate_descendant_ids + entity['descendants'] = filter_fields(descendants_full) - # Add new properties to entity only needed for documents in the 'portal' index group if index_group == 'portal': - immediate_ancestors = [] - immediate_descendants = [] - - for immediate_ancestor_uuid in immediate_ancestor_ids: - immediate_ancestor_dict = self.call_entity_api(immediate_ancestor_uuid, 'documents') - immediate_ancestors.append(immediate_ancestor_dict) - index_group_immediate_ancestors = self._relatives_for_index_group( relative_ids=immediate_ancestor_ids - , index_group=index_group) - entity['immediate_ancestors'] = index_group_immediate_ancestors - - for immediate_descendant_uuid in immediate_descendant_ids: - immediate_descendant_dict = self.call_entity_api(immediate_descendant_uuid, 'documents') - immediate_descendants.append(immediate_descendant_dict) - index_group_immediate_descendants = self._relatives_for_index_group(relative_ids=immediate_descendant_ids - , index_group=index_group) - entity['immediate_descendants'] = index_group_immediate_descendants - - # The `sample_category` is "organ" and the `organ` code is set at the same time + entity['immediate_ancestors'] = filter_fields(immediate_ancestors_full) + entity['immediate_descendants'] = filter_fields(immediate_descendants_full) + if entity['entity_type'] in ['Sample', 'Dataset', 'Publication']: - # Add new properties if donor: entity['donor'] = donor - - # entity['origin_samples'] is a list - entity['origin_samples'] = [] - if ('sample_category' in entity) and (entity['sample_category'].lower() == 'organ') and ('organ' in entity) and (entity['organ'].strip() != ''): - entity['origin_samples'].append(copy.deepcopy(entity)) - else: - for ancestor in ancestors: - if ('sample_category' in ancestor) and (ancestor['sample_category'].lower() == 'organ') and ('organ' in ancestor) and (ancestor['organ'].strip() != ''): - entity['origin_samples'].append(ancestor) - - # Remove those added fields specified in `entity_properties_list` from origin_samples + entity['origin_samples'] = origin_samples self.exclude_added_top_level_properties(entity['origin_samples']) - # Remove calculated fields added to a Sample from 'origin_samples' for origin_sample in entity['origin_samples']: self.exclude_added_calculated_fields(origin_sample) - - # `source_samples` field is only available to Dataset if entity['entity_type'] in ['Dataset', 'Publication']: - entity['source_samples'] = None - e = entity + entity['source_samples'] = source_samples - while entity['source_samples'] is None: - parent_uuids = self.call_entity_api(entity_id=e['uuid'] - , endpoint_base='parents' - , endpoint_suffix=None - , url_property='uuid') - parents = [] - for parent_uuid in parent_uuids: - parent_entity_doc = self.call_entity_api(entity_id=parent_uuid - , endpoint_base='documents') - parents.append(parent_entity_doc) - - try: - if parents[0]['entity_type'] == 'Sample': - # If one parent entity of this Dataset is a Sample, then all parent entities - # of this Dataset must be Samples. - entity['source_samples'] = parents - e = parents[0] - except IndexError: - entity['source_samples'] = [] - - # Now that calculations use 'ancestors' with fully populated entities are - # complete, set entity['ancestors'] instead to a value appropriate for - # the index group, prior to any renaming or removal operations. - entity['ancestors'] = self._relatives_for_index_group( relative_ids=ancestor_ids - , index_group=index_group) + entity['ancestors'] = filter_fields(ancestors) if reindex_info is not None else \ + self._relatives_for_index_group(relative_ids=ancestor_ids, index_group=index_group) self._entity_keys_rename(entity) - - # Rename for properties that are objects - if entity.get('donor', None): - self._entity_keys_rename(entity['donor']) - - if entity.get('origin_samples', None): - for o in entity.get('origin_samples', None): - self._entity_keys_rename(o) - if entity.get('source_samples', None): - for s in entity.get('source_samples', None): - self._entity_keys_rename(s) - if entity.get('ancestors', None): - for a in entity.get('ancestors', None): - self._entity_keys_rename(a) - if entity.get('descendants', None): - for d in entity.get('descendants', None): - self._entity_keys_rename(d) - if entity.get('immediate_descendants', None): - for parent in entity.get('immediate_descendants', None): - self._entity_keys_rename(parent) - if entity.get('immediate_ancestors', None): - for child in entity.get('immediate_ancestors', None): - self._entity_keys_rename(child) + for field in ['donor', 'origin_samples', 'source_samples', 'ancestors', + 'descendants', 'immediate_descendants', 'immediate_ancestors']: + items = entity.get(field) + if items is None: + continue + if isinstance(items, list): + for item in items: + self._entity_keys_rename(item) + else: + self._entity_keys_rename(items) remove_specific_key_entry(entity, "other_metadata") - - # Add additional calculated fields self.add_calculated_fields(entity) - # Establish the list of fields which should be removed from top-level fields prior to - # writing the entity as an ElasticSearch document. - ig_doc_fields=INDEX_GROUP_PORTAL_DOC_FIELDS if index_group=='portal' else INDEX_GROUP_ENTITIES_DOC_FIELDS + ig_doc_fields = INDEX_GROUP_PORTAL_DOC_FIELDS if index_group == 'portal' else INDEX_GROUP_ENTITIES_DOC_FIELDS unretained_key_list = [k for k, v in ig_doc_fields.items() if v != PropertyRetentionEnum.ES_DOC] - # We need to leave fields in unretained_key_list on the instance of - # entity we are modifying for use by _generate_public_doc(), but we also - # do not want these fields in the ElasticSearch document. So make a - # deepcopy of entity, remove fields from it, and use it to return a value doc_entity = copy.deepcopy(entity) for top_level_field in {'ancestors', 'immediate_ancestors', 'descendants', 'immediate_descendants'}: if top_level_field in doc_entity: for field_of_top_level_field in unretained_key_list: - remove_specific_key_entry( obj=doc_entity[top_level_field] - , key_to_remove=field_of_top_level_field) - # After removing unneeded entries within members of the top_level_field, remove - # the member itself if it is empty. However, keep the top_level_field in - # the entity even if it is empty. + remove_specific_key_entry(obj=doc_entity[top_level_field] + , key_to_remove=field_of_top_level_field) for index, value in enumerate(doc_entity[top_level_field]): if not value: - doc_entity[top_level_field].pop(index) + doc_entity[top_level_field].pop(index) logger.info(f"Finished executing _generate_doc() for {doc_entity['entity_type']}" f" of uuid: {doc_entity['uuid']}" f" for the {index_group} index group.") - return json.dumps(doc_entity) if return_type == 'json' else doc_entity + except Exception as e: msg = "Exceptions during executing hubmap_translator._generate_doc()" - # Log the full stack trace, prepend a line with our message logger.exception(msg) - - # Raise the exception so the caller can handle it properly raise Exception(e) + def _generate_public_doc(self, entity, index_group:str): # N.B. This method assumes the state of the 'entity' argument has been processed by the # _generate_doc() function, so this method should always be called after that one. @@ -1921,61 +1903,42 @@ def _generate_public_doc(self, entity, index_group:str): property_key = 'next_revision_uuid' if (entity['entity_type'] in ['Dataset', 'Publication']) and (property_key in entity): next_revision_uuid = entity[property_key] - - # Can't reuse call_entity_api() here due to the response data type - # Making a call against entity-api/entities/?property=status + url = self.entity_api_url + "/entities/" + next_revision_uuid + "?property=status" response = requests.get(url, headers=self.request_headers, verify=False) - if response.status_code != 200: - logger.error(f"_generate_public_doc() failed to get Dataset/Publication status of next_revision_uuid via entity-api for uuid: {next_revision_uuid}") - - # Bubble up the error message from entity-api instead of sys.exit(msg) - # The caller will need to handle this exception + logger.error(f"_generate_public_doc() failed to get Dataset/Publication status" + f" of next_revision_uuid via entity-api for uuid: {next_revision_uuid}") response.raise_for_status() raise requests.exceptions.RequestException(response.text) - - # The call to entity-api returns string directly - dataset_status = (response.text).lower() - - # Check the `next_revision_uuid` and if the dataset is not published, - # pop the `next_revision_uuid` from this entity + dataset_status = response.text.lower() if dataset_status != self.DATASET_STATUS_PUBLISHED: logger.debug(f"Remove the {property_key} property from {entity['uuid']}") entity.pop(property_key) - descendants = self._relatives_for_index_group( relative_ids=[e['uuid'] for e in entity['descendants']] - , index_group=index_group) - entity['descendants'] = list(filter(self.is_public, descendants)) + entity['descendants'] = list(filter(self.is_public, entity['descendants'])) - # Add new properties to entity only needed for documents in the 'portal' index group if index_group == 'portal': entity['immediate_descendants'] = list(filter(self.is_public, entity['immediate_descendants'])) - # Establish the list of fields which should be removed from top-level fields prior to - # writing the entity as an ElasticSearch document. ig_doc_fields = INDEX_GROUP_PORTAL_DOC_FIELDS if index_group == 'portal' else INDEX_GROUP_ENTITIES_DOC_FIELDS unretained_key_list = [k for k, v in ig_doc_fields.items() if v != PropertyRetentionEnum.ES_DOC] - # Remove fields explicitly marked for excluded_properties_from_public_response per entity type in - # the provenance_schema.yaml of the entity-api. if entity['entity_type'] in self.public_doc_exclusion_dict: - self._remove_field_from_dict( a_dict=entity - , obj_to_remove=self.public_doc_exclusion_dict[entity['entity_type']]) + self._remove_field_from_dict(a_dict=entity + , obj_to_remove=self.public_doc_exclusion_dict[entity['entity_type']]) - # Because _generate_doc() left some fields on the entity which should not be a part of the - # ElasticSearch document, but which were needed for calculations prior to now, remove them before - # returning the public document contents. for top_level_field in {'ancestors', 'immediate_ancestors', 'descendants', 'immediate_descendants'}: if top_level_field in entity: for field_of_top_level_field in unretained_key_list: remove_specific_key_entry(obj=entity[top_level_field] - , key_to_remove=field_of_top_level_field) - - logger.info(f"Finished executing _generate_public_doc() for {entity['entity_type']} of uuid: {entity['uuid']}") + , key_to_remove=field_of_top_level_field) + logger.info(f"Finished executing _generate_public_doc() for {entity['entity_type']}" + f" of uuid: {entity['uuid']}" + f" for the {index_group} index group.") return json.dumps(entity) - + """ Retrieves fields designated in the provenance schema yaml under excluded_properties_from_public_response and returns the fields in a list