-
Notifications
You must be signed in to change notification settings - Fork 124
Introduce query history write buffer for DB resiliency #794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Peiying Ye.
|
0703b78 to
0512583
Compare
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Peiying Ye.
|
0512583 to
8c52dd0
Compare
trigger build
85776ef to
caab5db
Compare
| { | ||
| dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class); | ||
| this.isOracleBackend = isOracleBackend; | ||
| if (writeBufferConfig != null && writeBufferConfig.isEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (writeBufferConfig != null && writeBufferConfig.isEnabled()) { | |
| if (writeBufferConfig.isEnabled()) { |
writeBufferConfig should never be null. If it's null for whatever reason, we should fail fast.
| public void buffer(T item) | ||
| { | ||
| if (!deque.offerLast(item)) { | ||
| deque.pollFirst(); | ||
| deque.offerLast(item); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs synchronized to be thread safe.
| int flushed = 0; | ||
| for (T next; (next = deque.pollFirst()) != null; ) { | ||
| try { | ||
| flusher.accept(next); | ||
| flushed++; | ||
| } | ||
| catch (RuntimeException e) { | ||
| deque.offerFirst(next); | ||
| break; // stop after first failure | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may fail to insert back to the queue when it's full.
Maybe peek() -> accept() -> remove() ?
| public class WriteBufferConfiguration | ||
| { | ||
| private boolean enabled; | ||
| private int maxCapacity = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for this number 10,000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a default value, same for the flushInterval. You can always overwrite them in the config file. I'll also update the default value settings in the doc. Do you recommend to set different default values for them?
| { | ||
| private boolean enabled; | ||
| private int maxCapacity = 10000; | ||
| private Duration flushInterval = new Duration(2, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for 2 seconds. What is the reason for this number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #794 (comment)
|
|
||
| public void setMaxCapacity(int maxCapacity) | ||
| { | ||
| this.maxCapacity = maxCapacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check for negative/zero number?
| public WriteBufferConfiguration() {} | ||
|
|
||
| public WriteBufferConfiguration(boolean enabled, int maxCapacity, Duration flushInterval) | ||
| { | ||
| this.enabled = enabled; | ||
| this.maxCapacity = maxCapacity; | ||
| this.flushInterval = flushInterval; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you need two constructors?
| } | ||
| catch (RuntimeException e) { | ||
| if (isConnectionIssue(e) && writeBuffer != null) { | ||
| writeBuffer.buffer(queryDetail); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if writer buffer is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we won't buffer this record and the query submission would fail - same behavior as the current code without this change.
| writeBufferConfig.getFlushInterval().toMillis(), | ||
| writeBufferConfig.getFlushInterval().toMillis(), | ||
| TimeUnit.MILLISECONDS); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't there be a race condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate the race condition here? The flusher scheduledExecutor is a single thread here, and the buffering thread safe issue is handled in the WriteBuffer class
Description
Following the work in #783 to introduce caching in
HaGatewayManager, this PR adds a write buffer mechanism toQueryHistoryManager. With this change, if the database becomes unavailable, Trino-Gateway can continue to route queries using the cached Trino cluster data while temporarily storing query history records in the write buffer. When the database is available again, the buffered history entries will be flushed. This allows Trino-Gateway to avoid using the database as a single point of failure and improves overall resiliency.This approach has already been implemented and validated in production at LinkedIn.
Testing
mvn clean install