-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Bug summary
We deploy flows programmatically in python like this, and also deploy DeploymentEventTrigger at the same time. Our server is self hosted using docker, with postgres as a database
# In deploy_stuff.py
# get some stuff ready to deploy
...
docker_volumes = [.... some stuff .... ]
docker_networks = [.... some stuff .... ]
shared_job_env_variables = dict(
{
PREFECT_CONSTANTS.ENV_VAR_NAMES.DATA_STORE_OVERRIDE: "/data/",
PREFECT_CONSTANTS.ENV_VAR_NAMES.SQLALCHEMY_URL: os.getenv(
PREFECT_CONSTANTS.ENV_VAR_NAMES.SQLALCHEMY_URL
),
PREFECT_CONSTANTS.ENV_VAR_NAMES.PREFECT_LOGGING_EXTRA_LOGGERS: PREFECT_CONSTANTS.DEFAULT_LOGGERS,
}
)
shared_job_variables = dict(
env=shared_job_env_variables,
image_pull_policy="IfNotPresent",
networks=docker_networks,
volumes=docker_volumes,
)
poll_FOO_deployable = poll_FOO_flow.to_deployment(
name=PREFECT_CONSTANTS.DEPLOYMENT_NAMES.POLL_FOO,
cron=get_cron_from_env(PREFECT_CONSTANTS.ENV_VAR_NAMES.POLL_FOO_CRON),
job_variables=shared_job_variables,
tags=[PREFECT_CONSTANTS.PREFECT_TAG],
)
check_FOO_deployable = check_FOO_flow.to_deployment(
name=PREFECT_CONSTANTS.DEPLOYMENT_NAMES.CHECK_FOO,
job_variables=shared_job_variables,
tags=[PREFECT_CONSTANTS.PREFECT_TAG],
triggers=[
DeploymentEventTrigger(
name="Trigger FOO validation on FOO update",
expect={"thing.foo.updated"},
match_related={
"prefect.resource.name": PREFECT_CONSTANTS.FLOW_NAMES.POLL_FOO
}, # type: ignore
parameters={
"files": {
"__prefect_kind": "json",
"value": {
"__prefect_kind": "jinja",
"template": "{{ event.payload.files | tojson }}",
},
},
},
),
],
)
upload_deployable = upload_new_files.to_deployment(
name=PREFECT_CONSTANTS.DEPLOYMENT_NAMES.UPLOAD,
cron=get_cron_from_env(
PREFECT_CONSTANTS.ENV_VAR_NAMES.IMAP_CRON_UPLOAD
),
job_variables=shared_job_variables,
tags=[PREFECT_CONSTANTS.PREFECT_TAG],
triggers=[
DeploymentEventTrigger(
name="Trigger upload after foo poll",
expect={PREFECT_CONSTANTS.EVENT.FLOW_RUN_COMPLETED},
match_related={
"prefect.resource.name": PREFECT_CONSTANTS.FLOW_NAMES.POLL_FOO
},
),
],
)
deployables = (
poll_foo_deployable,
check_foo_deployable,
upload_deployable,
)
deploy_ids = deploy(
*deployables, # type: ignore
work_pool_name=PREFECT_CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}", # our custom docker image
) The flow that is can trigger another flow looks like this:
# the flow is defined like this
@flow(
name=PREFECT_CONSTANTS.FLOW_NAMES.POLL_FOO,
log_prints=True,
flow_run_name=generate_flow_run_name,
retries=1,
)
async def poll_FOO_flow()
{
#... some stuff ...
downloaded_foo = {... some stuff ...}
# Trigger event to notify updated I-ALiRT data
logger.debug(f"Emitting {PREFECT_CONSTANTS.EVENT.FOO_UPDATED} event")
event: Event | None = emit_event(
event="thing.foo.updated",
resource={
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.name": flow_run.name,
"prefect.resource.role": "flow-run",
},
payload={"files": list(downloaded_foo.keys())},
)
if event is None:
logger.warning("Failed to emit FOO updated event")
}
Both types of automation triggers stop working - we can emit the event but the trigger never fires, neither does one flow run completing trigger the next
And after we deploy for the first time they work fine. Then at some later time we redeploy them again (alongside some code changes) and the automations break and stop working - check_FOO_flow and upload_new_files flow never trigger again. Then after the next server restart they seem to start working again.
So we think the repro steps are
- Deploy v1 using python deploy()
- Change the code
- Deploy v2 using python deploy()
- Automations are all broken. Error in logs (see below)
- Restart server
- Automations work again
In addition it looks like we get exceptions in our prefect server docker container logs:
13:56:32.170 | ERROR | prefect.server.utilities.messaging.memory - Failed in consume_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.13/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute
self._rows = deque(await prepared_stmt.fetch(*parameters))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/site-packages/asyncpg/prepared_stmt.py", line 267, in __bind_execute
data, status, _ = await self.__do_execute(
^^^^^^^^^^^^^^^^^^^^^^^^
lambda protocol: protocol.bind_execute(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
self._state, args, '', limit, True, timeout))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/site-packages/asyncpg/prepared_stmt.py", line 256, in __do_execute
return await executor(protocol)
^^^^^^^^^^^^^^^^^^^^^^^^
File "asyncpg/protocol/protocol.pyx", line 206, in bind_execute
asyncpg.exceptions.ForeignKeyViolationError: insert or update on table "automation_bucket" violates foreign key constraint "fk_automation_bucket__automation_id__automation"
DETAIL: Key (automation_id)=(5e34ad3c-d021-41da-b6a1-ad7e3ac67447) is not present in table "automation".
...
sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.ForeignKeyViolationError'>: insert or update on table "automation_bucket" violates foreign key constraint "fk_automation_bucket__automation_id__automation"
DETAIL: Key (automation_id)=(5e34ad3c-d021-41da-b6a1-ad7e3ac67447) is not present in table "automation".
[SQL: INSERT INTO automation_bucket (automation_id, trigger_id, bucketing_key, last_event, start, "end", count, last_operation, id, created, updated) VALUES ($1::UUID, $2::UUID, $3::JSONB, $4, $5::TIMESTAMP WITH TIME ZONE, $6::TIMESTAMP WITH TIME ZONE, $7::INTEGER, $8::VARCHAR, $9::UUID, $10::TIMESTAMP WITH TIME ZONE, $11::TIMESTAMP WITH TIME ZONE) ON CONFLICT (automation_id, trigger_id, bucketing_key) DO UPDATE SET last_event = $12, count = automation_bucket.count]
[parameters: ('5e34ad3c-d021-41da-b6a1-ad7e3ac67447', 'af9f49fe-e4b8-43b9-803f-a5555799e6f1', '[]', '{"occurred": "2025-11-03T13:56:32.016956Z", "event": "prefect.flow-run.Completed", "resource": {"prefect.resource.id": "prefect.flow-run.019a4929-b9a ... (1290 characters truncated) ... "type": "COMPLETED", "name": "Completed"}}, "id": "019a4a01-5290-705d-8993-493323156e63", "follows": null, "received": "2025-11-03T13:56:32.061408Z"}', datetime.datetime(2025, 11, 3, 13, 41, 32, 16956, tzinfo=TzInfo(0)), datetime.datetime(2025, 11, 3, 13, 56, 32, 16956, tzinfo=TzInfo(0)), 0, 'ensure_bucket[insert]', '61f3abb7-bb06-4458-933d-ee4a65c0b3f7', datetime.datetime(2025, 11, 3, 13, 56, 32, 167930, tzinfo=zoneinfo.ZoneInfo(key='UTC')), datetime.datetime(2025, 11, 3, 13, 56, 32, 167939, tzinfo=zoneinfo.ZoneInfo(key='UTC')), '{"occurred": "2025-11-03T13:56:32.016956Z", "event": "prefect.flow-run.Completed", "resource": {"prefect.resource.id": "prefect.flow-run.019a4929-b9a ... (1290 characters truncated) ... "type": "COMPLETED", "name": "Completed"}}, "id": "019a4a01-5290-705d-8993-493323156e63", "follows": null, "received": "2025-11-03T13:56:32.061408Z"}')]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
13:56:32.179 | WARNING | prefect.server.utilities.messaging.memory - Message failed after 4 retries and will be moved to the dead letter queue
Version info
We are running docker image `prefecthq/prefect:3.4.24-python3.13` and using python 3.13 and prefect 3.4.24
Additional context
No response