Concurrent runs overwrite intermediate asset data #31870
Unanswered
maurakeith
asked this question in
Q&A
Replies: 1 comment
-
|
You could have a custom IO manager that just prefixes its output with the run ID taken from the OutputContext. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Hi there!
Summary
I am having a problem with concurrent runs for the same job overwriting each other's intermediate asset data. When my sensor triggers multiple runs simultaneously, a downstream asset in one run incorrectly loads the output from an asset in another concurrent run. This causes the pipelines to fail. The issue is that the intermediate data from one run is being overwritten by the data from a concurrent run, causing downstream assets to use the wrong input. I simplified the issue in a test pipeline that reproduces the error below. The logs from my test pipeline confirm this: the second run's asset_b_test logs the value from the first run's asset_a_test instead of its own.
Example Code
Here is a simplified version of my code that reproduces the issue:
Sensor:
test_sensor.pyThis sensor, whenever it runs, will trigger 2 runs of my
test_job. Therun_keyis a UUID, andrun_configis another UUID, so they are guaranteed to be unique.Assets:
test_assets.pyI have 2 assets: asset a simply takes in the run config with
id_valand returns it. asset b receives the output from asset a and returns f"Asset B received: {id_val}"Job:
job.pyDefinition:
definitions.pyI see that the asset materializations from my runs are stored in a common location. That is, there isn't a run-specific location that asset materializations are stored in.
asset_a_testandasset_b_testare simply overwritten with every run:Troubleshooting Steps Taken
FilesystemIOManagerto attempt to save asset materializations to a unique locationOutputclass with metadata defined to differentiate asset materializationsIt appears I need a way to make Dagster's IO manager store intermediate outputs in a run-specific path to avoid these conflicts. Any guidance on how to properly implement a custom IO manager or another solution would be greatly appreciated.
Beta Was this translation helpful? Give feedback.
All reactions