-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore: disk backed backpressure for connections #6011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
src/facade/dragonfly_connection.cc
Outdated
| #include "absl/flags/internal/flag.h" | ||
| #include "absl/functional/function_ref.h" | ||
| #include "absl/strings/str_split.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
aa9ce8e to
bf6b47d
Compare
| return {}; | ||
| } | ||
|
|
||
| std::string backing_name = absl::StrCat("/tmp/backing_", id_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we introduce a flag for those backing files or /tmp/ is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should provide a configurable prefix, /tmp/ is not enough.
Signed-off-by: Kostas Kyrimis <[email protected]>
| "Maximum size of the backing file. When the watermark is reached, connection will " | ||
| "stop offloading backpressure to disk"); | ||
|
|
||
| ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
30 is low IMO, any preference for default value ?
|
|
||
| bool request_shutdown_ = false; | ||
|
|
||
| class DiskBackedBackpressureQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pr is large, lets do the usual:
split it in self contained chunks. specifically, this should reside in dedicated files.
| "If non-zero, waits for this time for more I/O " | ||
| " events to come for the connection in case there is only one command in the pipeline. "); | ||
|
|
||
| ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the unit here? what crosses the watermark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatch_q_ > watermark
Will edit the comment/description accordingly
| // has pending items. | ||
| if (backing_queue_ && | ||
| ((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) { | ||
| auto ec = backing_queue_->Init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have not looked at Init, but i prefer to have explicit initalization of resources at the beginning of the flow (where you create backing_queue_) rather than doing it lazily
| // Offload only when dispatch_q_ crosses watermark or when backing queue already | ||
| // has pending items. | ||
| if (backing_queue_ && | ||
| ((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i prefer that DiskOffloadThreshold() would be in QueueBackpressure rather than in backing_queue_ , this way you do not need to touch backing_queue_ object during the happy path.
|
|
||
| memcpy(ptr->storage.begin(), bytes.begin(), bytes.size()); | ||
| std::string_view read{reinterpret_cast<char*>(ptr->storage.data()), bytes.size()}; | ||
| ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean? we do not support binary strings ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's serialized and deserialized as is: when we parse the command from the socket we create a PipelineMsg. Within that, there is:
- a flat storage which is the whole command + args
separatedby\0 - a
view(span) that can iterate over each argument (cmd + args)
(see FromArgs{} function)
When we offload this, we offload the flat blob storage as is to disk. Loading is easy as reading the bytes directly back to a PipelinePtr. the StrSplit is a fancy way to build the (2) view of the flat storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is 1-1 with FromArgs and since the object layout was already flat -- it was easy to write it to disk as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, writing ito do disk is easy but I think your conclusion is wrong, and you can not load binary strings that contain \0 inside using split(). you will need to serialize argument sizes to make it correct.
In fact, I would like to challenge your design choices:
- the pipeline is offloaded after the parsing.
- this means you need to serialize structured data, we already saw you keep offsets to keep track of the blob sizes, for example inside your disk queue.
- but you also need to parse back these argument sets. it require more complicated serialisation to do it properly.
Instead you could offload raw socket data before parsing:
- you save CPU on parsing data that you later offload
- you do not need to store blob sizes, so your on-disk queue is simpler
- there is no on-disk format at all - you do not need to worry about serializing command arguments properly.
So your file becomes a full substitute for socket until it depletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, writing ito do disk is easy but I think your conclusion is wrong
I am not sure I understand why \0 is a problem here. For example a command set foo bar is parsed and stored as set\0foo\0bar\0 both in memory and on disk. Using split() after loading this to build a view of the command arguments (e,g {set, foo, bar} is perfectly valid in that context. Am I missing something ?
Instead you could offload raw socket data before parsing:
My concern for this would be that parsing should now also be a part of AsyncFb. Furthermore, parsing a raw socket blob that was written to disk needs to handle cases like INPUT_PENDING, which introduces additional preemption points to the async fiber outside of a) blocking on queue backpressure b) blocking on loading the next batch to execute from disk. For example:
async_fb-> blocks on send because client is not reading, dispatch queue grows
conn_fb -> reads from socket starts offloading disk queue
client -> stops sending commands
conn_fb-> blocks on Connection::HandleRecvSocket
async_fb->drains the dispatch_queue
We need to load from disk the blob, parse it and place it in the dispatch_q. conn_fiber is blocked on socket io. There are two options:
- coordinate and somehow unblock
HandleRecvSocketsuch that ConnFiber can do a disk read + parsing, construct the msg and send it to the dispatch_q. - read from disk and do the parsing inside
AsyncFiber.
If we choose (2) and since ConnFb offloads unparsed data, now AsyncFb can parse half entries (because of INPUT_PENDING). So if the last entry written to disk is half (because the other one will arrive soon in the socket -- in other words we just drained the disk backpressure):
- AsyncFb needs to read more to finish parsing -> preempts
- ConnFb reads from socket -> writes it to backing.
So now ConnFb must notify the AsyncFb to continue parsing.
This is something that can be handled but nevertheless it seems that now parsing responsibilities of the ConnFb leak to the AsyncFb. What is more, now the complicated parsing stages (like INPUT_PENDING) must also be handled by AsyncFb.
you save CPU on parsing data that you later offload
I am slightly confused over this as well. Didn't you suggest not to use the uring api at all which implies a full proactor block on that thread on io ? Also, assuming that the client will eventually read everything, won't we parse the data anyway once? What we offload is what we will eventually load and this is already parsed(apart of the split which just builds a view). So we don't actually save anything here -- we just delegate the parsing for a later time. What I mean here:
parse -> write to disk size(data) + parsed -> parse 2 -> write to disk size(data 2)
parse 2 times, writes 2 times
vs
write to disk 1 -> write to disk 2 -> parse 1 -> parse2
parse 2 times, writes 2 times
All in all, I am not against your solution I just want to understand it a little bit more because the tradeoff here seems to be:
prefix the parsed command with its size (and now we don't need extra memory) before writting to disk -> self contained change within DiskBackpressure
vs
write blobs -> requires parsing within AsyncFb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kostasrim with set foo bar there is no problem. the problem is with binary strings that you send with RESP protocol like *3$3\r\nSET\r\n$3\r\nf\0\0\r\n$3\r\nb\0\0\r\n or in other words where we send set f\0\0 b\0\0.
And it's trivially to test and reproduce the problem.
regarding your second point - no, AsyncFb should not handle parsing, ConnFb should do it. And I agree that HandleRecvSocket is problematic, but lets first understand what SHOULD be done and then discuss how we do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting so, set f\0\0 is a valid command. I now understand what you were talking about.
Ok let's sync internally.
| : max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)), | ||
| max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)), | ||
| watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) { | ||
| id_ = ++unique_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is buggy as DiskBackedBackpressureQueue is initialized in multiple threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's a data race indeed. I wanted a unique identifier. will fix
| VLOG(3) << "Command offloaded: " << msg->FullCommand(); | ||
| } | ||
|
|
||
| template <typename F> void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
template is not justified here. please use std::function instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am happy to use a function here.
template is not justified here.
For completeness, this is not how I think about it. std::function allocates dynamically and a deduced lambda argument from a function template parameter does not. Using the most generic form (std::function) in a non generic context (a plain function) is the overkill here.
Part 1 of #5952
Implements basic disk backpressure for connections that dispatch async. When
Connection::dispatch_q_surpasses an arbitrary (defined in a flag) value, the connection will start offloading its pipeline messages to disk instead of blocking. The connection fiber is aproducerbecause it writes data to disk when it can't push to the dispatch queue and the async fiber is theconsumer: it drains backpressure from the queue and the rest (if any) from disk.TODO and will follow up on a separate PR: