Skip to content

Conversation

@kostasrim
Copy link
Contributor

@kostasrim kostasrim commented Nov 4, 2025

Part 1 of #5952

Implements basic disk backpressure for connections that dispatch async. When Connection::dispatch_q_ surpasses an arbitrary (defined in a flag) value, the connection will start offloading its pipeline messages to disk instead of blocking. The connection fiber is a producer because it writes data to disk when it can't push to the dispatch queue and the async fiber is the consumer: it drains backpressure from the queue and the rest (if any) from disk.

  • add basic disk backed backpressure for connections
  • add simple test

TODO and will follow up on a separate PR:

  • Truncate backing file such that its total size is capped (requires small helio changes but it should be quick)
  • GC files when connection is closed, aborted etc.
  • Add tests for error paths and that the connection properly restarts reading from the queue when backpressure is drained.

@kostasrim kostasrim self-assigned this Nov 4, 2025
Comment on lines 16 to 18
#include "absl/flags/internal/flag.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/str_split.h"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@kostasrim kostasrim changed the title [WIP - do not review] chore: disk based backpressure for connections [WIP - do not review] chore: disk backed backpressure for connections Nov 5, 2025
@kostasrim kostasrim force-pushed the kpr12 branch 3 times, most recently from aa9ce8e to bf6b47d Compare November 5, 2025 14:27
return {};
}

std::string backing_name = absl::StrCat("/tmp/backing_", id_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we introduce a flag for those backing files or /tmp/ is enough?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should provide a configurable prefix, /tmp/ is not enough.

@kostasrim kostasrim changed the title [WIP - do not review] chore: disk backed backpressure for connections chore: disk backed backpressure for connections Nov 6, 2025
@kostasrim kostasrim requested review from dranikpg and romange November 6, 2025 08:15
@kostasrim kostasrim marked this pull request as ready for review November 6, 2025 08:15
"Maximum size of the backing file. When the watermark is reached, connection will "
"stop offloading backpressure to disk");

ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 is low IMO, any preference for default value ?


bool request_shutdown_ = false;

class DiskBackedBackpressureQueue {
Copy link
Collaborator

@romange romange Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pr is large, lets do the usual:
split it in self contained chunks. specifically, this should reside in dedicated files.

"If non-zero, waits for this time for more I/O "
" events to come for the connection in case there is only one command in the pipeline. ");

ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the unit here? what crosses the watermark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch_q_ > watermark

Will edit the comment/description accordingly

// has pending items.
if (backing_queue_ &&
((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) {
auto ec = backing_queue_->Init();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have not looked at Init, but i prefer to have explicit initalization of resources at the beginning of the flow (where you create backing_queue_) rather than doing it lazily

// Offload only when dispatch_q_ crosses watermark or when backing queue already
// has pending items.
if (backing_queue_ &&
((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer that DiskOffloadThreshold() would be in QueueBackpressure rather than in backing_queue_ , this way you do not need to touch backing_queue_ object during the happy path.


memcpy(ptr->storage.begin(), bytes.begin(), bytes.size());
std::string_view read{reinterpret_cast<char*>(ptr->storage.data()), bytes.size()};
ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean? we do not support binary strings ?

Copy link
Contributor Author

@kostasrim kostasrim Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's serialized and deserialized as is: when we parse the command from the socket we create a PipelineMsg. Within that, there is:

  1. a flat storage which is the whole command + args separated by \0
  2. a view (span) that can iterate over each argument (cmd + args)

(see FromArgs{} function)

When we offload this, we offload the flat blob storage as is to disk. Loading is easy as reading the bytes directly back to a PipelinePtr. the StrSplit is a fancy way to build the (2) view of the flat storage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is 1-1 with FromArgs and since the object layout was already flat -- it was easy to write it to disk as is

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, writing ito do disk is easy but I think your conclusion is wrong, and you can not load binary strings that contain \0 inside using split(). you will need to serialize argument sizes to make it correct.

In fact, I would like to challenge your design choices:

  1. the pipeline is offloaded after the parsing.
  2. this means you need to serialize structured data, we already saw you keep offsets to keep track of the blob sizes, for example inside your disk queue.
  3. but you also need to parse back these argument sets. it require more complicated serialisation to do it properly.

Instead you could offload raw socket data before parsing:

  1. you save CPU on parsing data that you later offload
  2. you do not need to store blob sizes, so your on-disk queue is simpler
  3. there is no on-disk format at all - you do not need to worry about serializing command arguments properly.

So your file becomes a full substitute for socket until it depletes.

Copy link
Contributor Author

@kostasrim kostasrim Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, writing ito do disk is easy but I think your conclusion is wrong

I am not sure I understand why \0 is a problem here. For example a command set foo bar is parsed and stored as set\0foo\0bar\0 both in memory and on disk. Using split() after loading this to build a view of the command arguments (e,g {set, foo, bar} is perfectly valid in that context. Am I missing something ?

Instead you could offload raw socket data before parsing:

My concern for this would be that parsing should now also be a part of AsyncFb. Furthermore, parsing a raw socket blob that was written to disk needs to handle cases like INPUT_PENDING, which introduces additional preemption points to the async fiber outside of a) blocking on queue backpressure b) blocking on loading the next batch to execute from disk. For example:

async_fb-> blocks on send because client is not reading, dispatch queue grows
conn_fb -> reads from socket starts offloading disk queue
client -> stops sending commands
conn_fb-> blocks on Connection::HandleRecvSocket
async_fb->drains the dispatch_queue

We need to load from disk the blob, parse it and place it in the dispatch_q. conn_fiber is blocked on socket io. There are two options:

  1. coordinate and somehow unblock HandleRecvSocket such that ConnFiber can do a disk read + parsing, construct the msg and send it to the dispatch_q.
  2. read from disk and do the parsing inside AsyncFiber.

If we choose (2) and since ConnFb offloads unparsed data, now AsyncFb can parse half entries (because of INPUT_PENDING). So if the last entry written to disk is half (because the other one will arrive soon in the socket -- in other words we just drained the disk backpressure):

  1. AsyncFb needs to read more to finish parsing -> preempts
  2. ConnFb reads from socket -> writes it to backing.

So now ConnFb must notify the AsyncFb to continue parsing.

This is something that can be handled but nevertheless it seems that now parsing responsibilities of the ConnFb leak to the AsyncFb. What is more, now the complicated parsing stages (like INPUT_PENDING) must also be handled by AsyncFb.

you save CPU on parsing data that you later offload

I am slightly confused over this as well. Didn't you suggest not to use the uring api at all which implies a full proactor block on that thread on io ? Also, assuming that the client will eventually read everything, won't we parse the data anyway once? What we offload is what we will eventually load and this is already parsed(apart of the split which just builds a view). So we don't actually save anything here -- we just delegate the parsing for a later time. What I mean here:

parse -> write to disk size(data) + parsed -> parse 2 -> write to disk size(data 2)
parse 2 times, writes 2 times
vs
write to disk 1 -> write to disk 2 -> parse 1 -> parse2
parse 2 times, writes 2 times

All in all, I am not against your solution I just want to understand it a little bit more because the tradeoff here seems to be:

prefix the parsed command with its size (and now we don't need extra memory) before writting to disk -> self contained change within DiskBackpressure

vs

write blobs -> requires parsing within AsyncFb

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kostasrim with set foo bar there is no problem. the problem is with binary strings that you send with RESP protocol like *3$3\r\nSET\r\n$3\r\nf\0\0\r\n$3\r\nb\0\0\r\n or in other words where we send set f\0\0 b\0\0.
And it's trivially to test and reproduce the problem.

regarding your second point - no, AsyncFb should not handle parsing, ConnFb should do it. And I agree that HandleRecvSocket is problematic, but lets first understand what SHOULD be done and then discuss how we do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting so, set f\0\0 is a valid command. I now understand what you were talking about.

Ok let's sync internally.

: max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)),
max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)),
watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) {
id_ = ++unique_id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is buggy as DiskBackedBackpressureQueue is initialized in multiple threads.

Copy link
Contributor Author

@kostasrim kostasrim Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's a data race indeed. I wanted a unique identifier. will fix

VLOG(3) << "Command offloaded: " << msg->FullCommand();
}

template <typename F> void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

template is not justified here. please use std::function instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to use a function here.

template is not justified here.

For completeness, this is not how I think about it. std::function allocates dynamically and a deduced lambda argument from a function template parameter does not. Using the most generic form (std::function) in a non generic context (a plain function) is the overkill here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants