Skip to content

Commit 673354c

Browse files
astronautasLukas Valatka
authored andcommitted
feat: Add possibility to materialize only latest values, to increase performance (#5713)
* add pull_all_from_table_or_query for clickhouse, to align with new materialization logic (calling it) Signed-off-by: lukas.valatka <[email protected]> * add option to select to materialize only latest values, for performance Signed-off-by: lukas.valatka <[email protected]> * enforce non optional params Signed-off-by: lukas.valatka <[email protected]> --------- Signed-off-by: lukas.valatka <[email protected]> Co-authored-by: Lukas Valatka <[email protected]>
1 parent edad903 commit 673354c

File tree

4 files changed

+56
-14
lines changed

4 files changed

+56
-14
lines changed

sdk/python/feast/infra/compute_engines/utils.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,40 @@ def create_offline_store_retrieval_job(
2121
context:
2222
start_time:
2323
end_time:
24-
2524
Returns:
2625
2726
"""
2827
offline_store = context.offline_store
29-
# 📥 Reuse Feast's robust query resolver
30-
retrieval_job = offline_store.pull_all_from_table_or_query(
31-
config=context.repo_config,
32-
data_source=data_source,
33-
join_key_columns=column_info.join_keys,
34-
feature_name_columns=column_info.feature_cols,
35-
timestamp_field=column_info.ts_col,
36-
created_timestamp_column=column_info.created_ts_col,
37-
start_date=start_time,
38-
end_date=end_time,
39-
)
28+
29+
pull_latest = context.repo_config.materialization_config.pull_latest_features
30+
31+
if pull_latest:
32+
if not start_time or not end_time:
33+
raise ValueError(
34+
"start_time and end_time must be provided when pull_latest_features is True"
35+
)
36+
37+
retrieval_job = offline_store.pull_latest_from_table_or_query(
38+
config=context.repo_config,
39+
data_source=data_source,
40+
join_key_columns=column_info.join_keys,
41+
feature_name_columns=column_info.feature_cols,
42+
timestamp_field=column_info.ts_col,
43+
created_timestamp_column=column_info.created_ts_col,
44+
start_date=start_time,
45+
end_date=end_time,
46+
)
47+
else:
48+
# 📥 Reuse Feast's robust query resolver
49+
retrieval_job = offline_store.pull_all_from_table_or_query(
50+
config=context.repo_config,
51+
data_source=data_source,
52+
join_key_columns=column_info.join_keys,
53+
feature_name_columns=column_info.feature_cols,
54+
timestamp_field=column_info.ts_col,
55+
created_timestamp_column=column_info.created_ts_col,
56+
start_date=start_time,
57+
end_date=end_time,
58+
)
59+
4060
return retrieval_job

sdk/python/feast/repo_config.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ def validate_path(cls, path: str, values: ValidationInfo) -> str:
182182
return path
183183

184184

185+
class MaterializationConfig(BaseModel):
186+
"""Configuration options for feature materialization behavior."""
187+
188+
pull_latest_features: StrictBool = False
189+
""" bool: If true, feature retrieval jobs will only pull the latest feature values for each entity.
190+
If false, feature retrieval jobs will pull all feature values within the specified time range. """
191+
192+
185193
class RepoConfig(FeastBaseModel):
186194
"""Repo config. Typically loaded from `feature_store.yaml`"""
187195

@@ -239,6 +247,11 @@ class RepoConfig(FeastBaseModel):
239247
coerce_tz_aware: Optional[bool] = True
240248
""" If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """
241249

250+
materialization_config: MaterializationConfig = Field(
251+
MaterializationConfig(), alias="materialization"
252+
)
253+
""" MaterializationConfig: Configuration options for feature materialization behavior. """
254+
242255
def __init__(self, **data: Any):
243256
super().__init__(**data)
244257

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from feast.permissions.auth_model import OidcClientAuthConfig
3535
from feast.permissions.permission import Permission
3636
from feast.permissions.policy import RoleBasedPolicy
37-
from feast.repo_config import RegistryConfig, RepoConfig
37+
from feast.repo_config import MaterializationConfig, RegistryConfig, RepoConfig
3838
from feast.utils import _utc_now
3939
from tests.integration.feature_repos.integration_test_repo_config import (
4040
IntegrationTestRepoConfig,
@@ -423,6 +423,9 @@ class Environment:
423423
entity_key_serialization_version: int
424424
repo_dir_name: str
425425
fixture_request: Optional[pytest.FixtureRequest] = None
426+
materialization: MaterializationConfig = dataclasses.field(
427+
default_factory=lambda: MaterializationConfig()
428+
)
426429

427430
def __post_init__(self):
428431
self.end_date = _utc_now().replace(microsecond=0, second=0, minute=0)
@@ -443,6 +446,7 @@ def setup(self):
443446
repo_path=self.repo_dir_name,
444447
feature_server=self.feature_server,
445448
entity_key_serialization_version=self.entity_key_serialization_version,
449+
materialization_config=self.materialization,
446450
)
447451

448452
self.feature_store = FeatureStore(config=self.config)

sdk/python/tests/integration/materialization/test_universal_materialization.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,12 @@ def odfv_multi(df: pd.DataFrame) -> pd.DataFrame:
219219

220220
@pytest.mark.integration
221221
@pytest.mark.universal_offline_stores
222-
def test_universal_materialization_consistency(environment):
222+
@pytest.mark.parametrize("materialization_pull_latest", [True, False])
223+
def test_universal_materialization_consistency(
224+
environment, materialization_pull_latest
225+
):
226+
environment.materialization.pull_latest_features = materialization_pull_latest
227+
223228
fs = environment.feature_store
224229
df = create_basic_driver_dataset()
225230
ds = environment.data_source_creator.create_data_source(

0 commit comments

Comments
 (0)