Skip to content

Commit 95e2dcd

Browse files
authored
session: automatically wait for schema agreement (#460)
Now, the session automatically waits for schema agreement after executing a statement which modifies the schema. The rationale for this change is that, previously, it was the user's responsibility to wait for schema agreement and it was easy to forget about this check. The automatic wait for schema agreement is never a bug, although it can slow down some programs which do a lot of schema changes in a short succession. Those applications can opt-out of this feature by disabling the feature in the configuration and can still wait for schema agreement manually. The session will wait for schema agreement for a configured amount of time, and then - if the agreement is not reached - it will return a TimeoutError. Fixes: #459
1 parent 27861bf commit 95e2dcd

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

scylla/src/transport/connection.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ impl QueryResponse {
158158
}
159159
}
160160

161+
pub fn as_schema_change(&self) -> Option<&result::SchemaChange> {
162+
match &self.response {
163+
Response::Result(result::Result::SchemaChange(sc)) => Some(sc),
164+
_ => None,
165+
}
166+
}
167+
161168
pub fn into_query_result(self) -> Result<QueryResult, QueryError> {
162169
let (rows, paging_state, col_specs) = match self.response {
163170
Response::Error(err) => return Err(err.into()),

scylla/src/transport/session.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
use std::time::Duration;
1212
use tokio::net::lookup_host;
1313
use tokio::time::timeout;
14-
use tracing::{debug, trace, trace_span, Instrument};
14+
use tracing::{debug, error, trace, trace_span, Instrument};
1515
use uuid::Uuid;
1616

1717
use super::connection::QueryResponse;
@@ -62,6 +62,7 @@ pub struct Session {
6262
speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
6363
metrics: Arc<Metrics>,
6464
default_consistency: Consistency,
65+
auto_await_schema_agreement_timeout: Option<Duration>,
6566
}
6667

6768
/// Configuration options for [`Session`].
@@ -113,6 +114,10 @@ pub struct SessionConfig {
113114

114115
/// Interval of sending keepalive requests
115116
pub keepalive_interval: Option<Duration>,
117+
118+
/// Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement.
119+
/// If `None`, the automatic schema agreement is disabled.
120+
pub auto_await_schema_agreement_timeout: Option<Duration>,
116121
}
117122

118123
/// Describes database server known on Session startup.
@@ -154,6 +159,7 @@ impl SessionConfig {
154159
default_consistency: Consistency::LocalQuorum,
155160
fetch_schema_metadata: true,
156161
keepalive_interval: None,
162+
auto_await_schema_agreement_timeout: Some(std::time::Duration::from_secs(60)),
157163
}
158164
}
159165

@@ -344,6 +350,7 @@ impl Session {
344350
speculative_execution_policy: config.speculative_execution_policy,
345351
metrics: Arc::new(Metrics::new()),
346352
default_consistency: config.default_consistency,
353+
auto_await_schema_agreement_timeout: config.auto_await_schema_agreement_timeout,
347354
};
348355

349356
if let Some(keyspace_name) = config.used_keyspace {
@@ -446,6 +453,8 @@ impl Session {
446453
.instrument(span)
447454
.await?;
448455
self.handle_set_keyspace_response(&response).await?;
456+
self.handle_auto_await_schema_agreement(&query.contents, &response)
457+
.await?;
449458

450459
response.into_query_result()
451460
}
@@ -466,6 +475,28 @@ impl Session {
466475
Ok(())
467476
}
468477

478+
async fn handle_auto_await_schema_agreement(
479+
&self,
480+
contents: &str,
481+
response: &QueryResponse,
482+
) -> Result<(), QueryError> {
483+
if let Some(timeout) = self.auto_await_schema_agreement_timeout {
484+
if response.as_schema_change().is_some()
485+
&& !self.await_timed_schema_agreement(timeout).await?
486+
{
487+
// TODO: The TimeoutError should allow to provide more context.
488+
// For now, print an error to the logs
489+
error!(
490+
"Failed to reach schema agreement after a schema-altering statement: {}",
491+
contents,
492+
);
493+
return Err(QueryError::TimeoutError);
494+
}
495+
}
496+
497+
Ok(())
498+
}
499+
469500
/// Run a simple query with paging\
470501
/// This method will query all pages of the result\
471502
///
@@ -713,6 +744,8 @@ impl Session {
713744
.instrument(span)
714745
.await?;
715746
self.handle_set_keyspace_response(&response).await?;
747+
self.handle_auto_await_schema_agreement(prepared.get_statement(), &response)
748+
.await?;
716749

717750
response.into_query_result()
718751
}

scylla/src/transport/session_builder.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,46 @@ impl SessionBuilder {
500500
self.config.keepalive_interval = Some(interval);
501501
self
502502
}
503+
504+
/// Enables automatic wait for schema agreement and sets the timeout for it.
505+
/// By default, it is enabled and the timeout is 60 seconds.
506+
///
507+
/// # Example
508+
/// ```
509+
/// # use scylla::{Session, SessionBuilder};
510+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
511+
/// let session: Session = SessionBuilder::new()
512+
/// .known_node("127.0.0.1:9042")
513+
/// .auto_schema_agreement_timeout(std::time::Duration::from_secs(120))
514+
/// .build()
515+
/// .await?;
516+
/// # Ok(())
517+
/// # }
518+
/// ```
519+
pub fn auto_schema_agreement_timeout(mut self, timeout: Duration) -> Self {
520+
self.config.auto_await_schema_agreement_timeout = Some(timeout);
521+
self
522+
}
523+
524+
/// Disables automatic wait for schema agreement.
525+
/// By default, it is enabled and the timeout is 60 seconds.
526+
///
527+
/// # Example
528+
/// ```
529+
/// # use scylla::{Session, SessionBuilder};
530+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
531+
/// let session: Session = SessionBuilder::new()
532+
/// .known_node("127.0.0.1:9042")
533+
/// .no_auto_schema_agreement()
534+
/// .build()
535+
/// .await?;
536+
/// # Ok(())
537+
/// # }
538+
/// ```
539+
pub fn no_auto_schema_agreement(mut self) -> Self {
540+
self.config.auto_await_schema_agreement_timeout = None;
541+
self
542+
}
503543
}
504544

505545
/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]

0 commit comments

Comments
 (0)