Skip to content

Implement MSG_ZEROCOPY for blight messages #398

@Eeems

Description

@Eeems
          @Eeems Here's a prototype implementation of `send_blocking` that uses MSG_ZEROCOPY:
// Helper struct to track zero-copy buffers
struct zerocopy_buffer {
    blight_data_t data;
    size_t size;
    int refcount;
    // Could add a callback for when the buffer is fully released
};

// Global buffer tracking (in a real implementation, this would be more sophisticated)
#include <unordered_map>
static std::unordered_map<blight_data_t, zerocopy_buffer*> zerocopy_buffers;
static std::mutex zerocopy_mutex;

// Register a buffer for zero-copy transmission
zerocopy_buffer* register_zerocopy_buffer(blight_data_t data, size_t size) {
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    auto buffer = new zerocopy_buffer{data, size, 1};
    zerocopy_buffers[data] = buffer;
    return buffer;
}

// Function signature changed to return the zerocopy_buffer for the caller to track
zerocopy_buffer* send_blocking_zerocopy(int fd, const blight_data_t data, ssize_t size) {
    // Register the buffer for zero-copy transmission
    zerocopy_buffer* buffer = register_zerocopy_buffer(const_cast<blight_data_t>(data), size);
    
    ssize_t sent = 0;
    ssize_t res = 0;
    
    while (sent < size) {
        if (!wait_for_send(fd)) {
            if (errno == EAGAIN || errno == EINTR) {
                short_pause();
                continue;
            }
            // Clean up on failure
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        // Use MSG_ZEROCOPY flag for zero-copy transmission
        res = ::send(fd, &data[sent], size - sent, MSG_NOSIGNAL | MSG_ZEROCOPY);
        
        if (res > 0) {
            sent += res;
            continue;
        }
        
        if (res == 0) {
            // Connection closed
            errno = ECONNRESET;
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        if (errno != EAGAIN && errno != EINTR) {
            // Unexpected error
            release_zerocopy_buffer(buffer);
            return nullptr;
        }
        
        // Temporary error, retry
        short_pause();
    }
    
    // Setup poll to monitor for completion events if needed
    return buffer;
}

// Process zero-copy completion notifications
bool process_zerocopy_completions(int fd) {
    struct sock_extended_err err;
    struct msghdr msg = {};
    struct iovec iov;
    
    char control[CMSG_SPACE(sizeof(struct sock_extended_err))];
    msg.msg_control = control;
    msg.msg_controllen = sizeof(control);
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    
    int res = recvmsg(fd, &msg, MSG_ERRQUEUE);
    if (res == -1)
        return false;
    
    struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
    if (!cmsg || cmsg->cmsg_level != SOL_IP || cmsg->cmsg_type != IP_RECVERR)
        return false;
    
    memcpy(&err, CMSG_DATA(cmsg), sizeof(err));
    if (err.ee_errno != 0 || err.ee_origin != SO_EE_ORIGIN_ZEROCOPY)
        return false;
    
    // Process completion notification
    // err.ee_data and err.ee_info contain information about completed buffers
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    // Update reference counts for completed buffers
    // In a real implementation, we'd map err.ee_data to specific buffers
    
    return true;
}

// Release a zero-copy buffer (decrease refcount, free if refcount reaches 0)
void release_zerocopy_buffer(zerocopy_buffer* buffer) {
    if (!buffer)
        return;
        
    std::lock_guard<std::mutex> lock(zerocopy_mutex);
    if (--buffer->refcount <= 0) {
        zerocopy_buffers.erase(buffer->data);
        delete[] buffer->data;  // Only free the data when refcount reaches 0
        delete buffer;
    }
}

This implementation:

  1. Creates a tracking system for zero-copy buffers
  2. Uses MSG_ZEROCOPY flag with send()
  3. Provides a mechanism to track when it's safe to free the buffer
  4. Includes a function to process completion notifications

Note that in a real implementation, you'd want to integrate this with your event loop to process the MSG_ERRQUEUE messages that signal when buffers can be released.


🧠 Learnings used
Learnt from: Eeems
PR: Eeems-Org/oxide#397
File: shared/libblight_protocol/socket.cpp:74-99
Timestamp: 2025-03-14T05:57:20.573Z
Learning: In shared/libblight_protocol/socket.cpp, the `recv_blocking` method is intentionally designed to block indefinitely, as it's only meant to be used in situations where data is guaranteed to be available due to a previous header being received. The non-blocking alternative with timeouts and retry limits is implemented in the `recv` method in the same file.

Originally posted by @coderabbitai[bot] in #397 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions