Issue 5755 change advisory lock to try advisory lock#5809
Conversation
… concurrency issues when multiple nodes compete for the same advisory lock
|
|
Summary by CodeRabbit
WalkthroughReplaced the PostgreSQL blocking advisory lock mechanism with a non-blocking approach using ChangesAdvisory Lock Non-Blocking Refactor
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@schema-engine/connectors/sql-schema-connector/src/flavour/postgres.rs`:
- Around line 383-386: The current parsing of pg_try_advisory_lock result (the
let acquired = ... expression) uses unwrap_or(false), which treats parsing
failures as "lock not acquired"; change this to explicitly distinguish parsing
errors from a contended lock by: attempt to parse the boolean from
result.first().and_then(|row| row.at(0).and_then(|v| v.as_bool())) and if None
or a parsing error occurs, return/propagate a distinct error (e.g., a
parse/driver error) instead of treating it as false; only treat a successfully
parsed false as "lock contended". Ensure the code paths and error messages that
use the acquired variable (the parsing expression and the subsequent contention
check around pg_try_advisory_lock) are updated to handle and surface the parse
error separately.
- Around line 378-395: Extract the hardcoded advisory lock id (72707369) into a
named const (e.g., ADVISORY_LOCK_ID) and use that const when constructing the
SQL passed to connection.query_raw and when composing the
ConnectorError::from_msg string (and update the nearby comment to reference the
const as well); ensure the SQL string is formatted to include the const value
and the error message is built with the same const so the id is not duplicated
in literals anywhere in this block (look for connection.query_raw(...) and
ConnectorError::from_msg(...) to apply the change).
In `@schema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs`:
- Around line 100-138: The test postgres_advisory_lock_contention_fails_fast
currently may leak the advisory lock if unwrap_err() or the assertion panics;
wrap the dedicated connection/lock_holder in a small RAII guard (e.g.,
LockHolderGuard) that holds the Quaint connection and issues "SELECT
pg_advisory_unlock(72707369)" in its Drop implementation so the unlock always
runs, replace the explicit tok(lock_holder.raw_cmd(...)) call with constructing
the guard, and relax the assertion by checking for the stable parts of the error
(e.g., "Another instance" plus the production constant if available such as an
ADVISORY_LOCK message constant in postgres.rs, otherwise keep a minimal
substring) instead of coupling to the literal "pg_try_advisory_lock".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ce5a656e-4499-4de0-bab8-f45c81e38fe3
📒 Files selected for processing (3)
schema-engine/connectors/sql-schema-connector/src/flavour/postgres.rsschema-engine/connectors/sql-schema-connector/src/flavour/postgres/connector/wasm/mod.rsschema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs
| let result = connection | ||
| .query_raw("SELECT pg_try_advisory_lock(72707369)", &[]) | ||
| .await | ||
| .map_err(imp::quaint_error_mapper(params))?; | ||
|
|
||
| let acquired = result | ||
| .first() | ||
| .and_then(|row| row.at(0).and_then(|v| v.as_bool())) | ||
| .unwrap_or(false); | ||
|
|
||
| if !acquired { | ||
| return Err(ConnectorError::from_msg( | ||
| "Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock(72707369)). \ | ||
| Another instance is likely running migrations against this database. \ | ||
| See https://pris.ly/d/migrate-advisory-locking for details." | ||
| .to_owned(), | ||
| )); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Hardcoded lock id duplicated between the SQL and the error message.
72707369 is now repeated in three places (SQL literal, error message, and the comment on Line 376). If anyone ever rotates this id, the error string will silently drift. Extract a const and format both strings from it.
♻️ Suggested refactor
+const MIGRATE_ADVISORY_LOCK_ID: i64 = 72707369;
+
...
- let result = connection
- .query_raw("SELECT pg_try_advisory_lock(72707369)", &[])
- .await
- .map_err(imp::quaint_error_mapper(params))?;
+ let result = connection
+ .query_raw(
+ &format!("SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})"),
+ &[],
+ )
+ .await
+ .map_err(imp::quaint_error_mapper(params))?;
...
- if !acquired {
- return Err(ConnectorError::from_msg(
- "Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock(72707369)). \
- Another instance is likely running migrations against this database. \
- See https://pris.ly/d/migrate-advisory-locking for details."
- .to_owned(),
- ));
- }
+ if !acquired {
+ return Err(ConnectorError::from_msg(format!(
+ "Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})). \
+ Another instance is likely running migrations against this database. \
+ See https://pris.ly/d/migrate-advisory-locking for details."
+ )));
+ }Note the test in advisory_locking.rs only asserts on the substrings "pg_try_advisory_lock" and "Another instance", so this refactor stays test-compatible.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let result = connection | |
| .query_raw("SELECT pg_try_advisory_lock(72707369)", &[]) | |
| .await | |
| .map_err(imp::quaint_error_mapper(params))?; | |
| let acquired = result | |
| .first() | |
| .and_then(|row| row.at(0).and_then(|v| v.as_bool())) | |
| .unwrap_or(false); | |
| if !acquired { | |
| return Err(ConnectorError::from_msg( | |
| "Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock(72707369)). \ | |
| Another instance is likely running migrations against this database. \ | |
| See https://pris.ly/d/migrate-advisory-locking for details." | |
| .to_owned(), | |
| )); | |
| } | |
| const MIGRATE_ADVISORY_LOCK_ID: i64 = 72707369; | |
| let result = connection | |
| .query_raw( | |
| &format!("SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})"), | |
| &[], | |
| ) | |
| .await | |
| .map_err(imp::quaint_error_mapper(params))?; | |
| let acquired = result | |
| .first() | |
| .and_then(|row| row.at(0).and_then(|v| v.as_bool())) | |
| .unwrap_or(false); | |
| if !acquired { | |
| return Err(ConnectorError::from_msg(format!( | |
| "Could not acquire the postgres advisory lock (SELECT pg_try_advisory_lock({MIGRATE_ADVISORY_LOCK_ID})). \ | |
| Another instance is likely running migrations against this database. \ | |
| See https://pris.ly/d/migrate-advisory-locking for details." | |
| ))); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@schema-engine/connectors/sql-schema-connector/src/flavour/postgres.rs` around
lines 378 - 395, Extract the hardcoded advisory lock id (72707369) into a named
const (e.g., ADVISORY_LOCK_ID) and use that const when constructing the SQL
passed to connection.query_raw and when composing the ConnectorError::from_msg
string (and update the nearby comment to reference the const as well); ensure
the SQL string is formatted to include the const value and the error message is
built with the same const so the id is not duplicated in literals anywhere in
this block (look for connection.query_raw(...) and ConnectorError::from_msg(...)
to apply the change).
| let acquired = result | ||
| .first() | ||
| .and_then(|row| row.at(0).and_then(|v| v.as_bool())) | ||
| .unwrap_or(false); |
There was a problem hiding this comment.
unwrap_or(false) will misreport unparseable results as “another instance is running”.
If pg_try_advisory_lock ever returns no row or a non-boolean (e.g. due to an unexpected driver shape), the code silently treats that as “lock not acquired” and surfaces the contention error to the user — which is misleading and undebuggable. Distinguish "couldn’t parse the result" from "lock was contended":
🛡️ Proposed fix
- let acquired = result
- .first()
- .and_then(|row| row.at(0).and_then(|v| v.as_bool()))
- .unwrap_or(false);
+ let acquired = result
+ .first()
+ .and_then(|row| row.at(0).and_then(|v| v.as_bool()))
+ .ok_or_else(|| {
+ ConnectorError::from_msg(
+ "Unexpected result from pg_try_advisory_lock(72707369): \
+ expected a single boolean column."
+ .to_owned(),
+ )
+ })?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@schema-engine/connectors/sql-schema-connector/src/flavour/postgres.rs` around
lines 383 - 386, The current parsing of pg_try_advisory_lock result (the let
acquired = ... expression) uses unwrap_or(false), which treats parsing failures
as "lock not acquired"; change this to explicitly distinguish parsing errors
from a contended lock by: attempt to parse the boolean from
result.first().and_then(|row| row.at(0).and_then(|v| v.as_bool())) and if None
or a parsing error occurs, return/propagate a distinct error (e.g., a
parse/driver error) instead of treating it as false; only treat a successfully
parsed false as "lock contended". Ensure the code paths and error messages that
use the acquired variable (the parsing expression and the subsequent contention
check around pg_try_advisory_lock) are updated to handle and surface the parse
error separately.
| // Regression test for https://github.com/prisma/prisma-engines/issues/5755. | ||
| // We hold a session-level advisory lock on `72707369` from an external | ||
| // connection, then try to apply a migration. The schema engine must fail fast | ||
| // with an advisory-lock error instead of blocking (and potentially deadlocking | ||
| // against concurrent operations such as `CREATE INDEX CONCURRENTLY`). | ||
| #[test_connector(tags(Postgres), exclude(CockroachDb))] | ||
| fn postgres_advisory_lock_contention_fails_fast(mut api: TestApi) { | ||
| let mut me = api.new_engine(); | ||
| let migrations_directory = api.create_migrations_directory(); | ||
|
|
||
| let dm = api.datamodel_with_provider( | ||
| r#" | ||
| model Cat { | ||
| id Int @id | ||
| inBox Boolean | ||
| } | ||
| "#, | ||
| ); | ||
|
|
||
| me.create_migration("01initial", &dm, &migrations_directory) | ||
| .draft(true) | ||
| .send_sync(); | ||
|
|
||
| // Acquire a dedicated connection (separate from the engine's connection) | ||
| // and grab the same advisory lock the schema engine uses. | ||
| let lock_holder = tok(quaint::single::Quaint::new(api.connection_string())).unwrap(); | ||
| tok(lock_holder.raw_cmd("SELECT pg_advisory_lock(72707369)")).unwrap(); | ||
|
|
||
| assert_eq!(empty_results_count, 2); | ||
| let err = tok(async { me.apply_migrations(&migrations_directory).send().await }).unwrap_err(); | ||
|
|
||
| // Release the lock so the test database teardown is not blocked. | ||
| tok(lock_holder.raw_cmd("SELECT pg_advisory_unlock(72707369)")).unwrap(); | ||
|
|
||
| let msg = err.to_string(); | ||
| assert!( | ||
| msg.contains("pg_try_advisory_lock") && msg.contains("Another instance"), | ||
| "unexpected error: {msg}" | ||
| ); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Solid regression test, but consider tightening lock cleanup and message coupling.
Two small things:
- Lines 128/134–137: if
apply_migrationsunexpectedly succeeds,unwrap_err()panics before the explicitpg_advisory_unlockruns; same for any later assertion failure. In practice the session-level lock releases whenlock_holderis dropped (Quaint::singleis a single connection), so this is safe today, but a small RAII-style guard orDrophelper would make it robust against future refactors that share the connection. - Lines 134–137: the assertion couples the test to the literal substring
"pg_try_advisory_lock"from the production error message. If you adopt the suggested constant-based formatting inpostgres.rs, this still passes; just be aware the substring match is the only contract.
Otherwise the test correctly reproduces the #5755 contention scenario by holding the lock from a separate session and asserting fast failure.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@schema-engine/sql-migration-tests/tests/migrations/advisory_locking.rs`
around lines 100 - 138, The test postgres_advisory_lock_contention_fails_fast
currently may leak the advisory lock if unwrap_err() or the assertion panics;
wrap the dedicated connection/lock_holder in a small RAII guard (e.g.,
LockHolderGuard) that holds the Quaint connection and issues "SELECT
pg_advisory_unlock(72707369)" in its Drop implementation so the unlock always
runs, replace the explicit tok(lock_holder.raw_cmd(...)) call with constructing
the guard, and relax the assertion by checking for the stable parts of the error
(e.g., "Another instance" plus the production constant if available such as an
ADVISORY_LOCK message constant in postgres.rs, otherwise keep a minimal
substring) instead of coupling to the literal "pg_try_advisory_lock".
No description provided.