Skip to content

feat: extends IChannel with AsStream#134

Open
mrdevrobot wants to merge 9 commits into
NethermindEth:mainfrom
mrdevrobot:feature/Read-write-stream-wrapper-for-channel
Open

feat: extends IChannel with AsStream#134
mrdevrobot wants to merge 9 commits into
NethermindEth:mainfrom
mrdevrobot:feature/Read-write-stream-wrapper-for-channel

Conversation

@mrdevrobot
Copy link
Copy Markdown

@mrdevrobot mrdevrobot commented Jun 3, 2025

It implements a ChannelStream that wraps an IChannel.

The implementation does not touch the IChannel interface, even if some Stream abstraction can't be implemented without exposing some more information out of the IChannel interface.

Feature Supported
Write yes
WriteAsync yes
Read yes
ReadAsync yes
Flush no-op; the stream has no internal buffer
Seek no

@mrdevrobot mrdevrobot requested review from flcl42 and rubo as code owners June 3, 2025 07:48
@mrdevrobot
Copy link
Copy Markdown
Author

Refers #127

@flcl42
Copy link
Copy Markdown
Contributor

flcl42 commented Jun 5, 2025

Hello @lucafabbri ! The code looks great, just few notes:

Could you check src\libp2p\Libp2p.Core\Stream.cs? It contains some logic that may be useful for your implementation, it has more detailed handling of CanRead/CanWrite. it also implements such expected feature of streams: if there is noting yet to read it blocks until we have at least something. It returns immediately only if channel is closed for reading.
Could you remove that old implementation in favor of your one?

Copilot AI review requested due to automatic review settings May 20, 2026 12:42
@flcl42 flcl42 force-pushed the feature/Read-write-stream-wrapper-for-channel branch from 1410531 to cb79135 Compare May 20, 2026 12:42
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Stream adapter for IChannel to improve interoperability with APIs that consume System.IO.Stream, and includes basic test coverage for read/write behavior (including the newer Memory<T> overloads).

Changes:

  • Added IChannel.AsStream() extension returning a ChannelStream.
  • Updated ChannelStream to implement ReadAsync/WriteAsync Memory<T> overloads and to honor offset/count when copying read data.
  • Added NUnit tests validating round-trip reads/writes and zero-length read/write behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
src/libp2p/Libp2p.Core/Stream.cs Improves ChannelStream async overloads and buffer handling for stream-based interop.
src/libp2p/Libp2p.Core/Extensions/IChannelExtensions.cs Introduces AsStream() extension method on IChannel.
src/libp2p/Libp2p.Core.Tests/ChannelTests.cs Adds tests for the new AsStream() adapter and basic read/write semantics.
Comments suppressed due to low confidence (2)

src/libp2p/Libp2p.Core/Stream.cs:96

  • ReadAsync(Memory<byte>, ...) returns early for empty buffers only when _canRead is true. A zero-length read should return 0 immediately regardless of stream state; otherwise this can end up calling _chan.ReadAsync(0, WaitAny, ...) and blocking (per Channel.ReaderWriter.ReadAsync behavior for WaitAny + length 0).
    public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        if (buffer.IsEmpty && _canRead) return ValueTask.FromResult(0);

        return ReadAsyncCore(buffer, cancellationToken);
    }

src/libp2p/Libp2p.Core/Stream.cs:105

  • ReadAsyncCore treats any non-Ok result (including IOResult.Cancelled) as end-of-stream by setting _canRead = false and returning 0. If the cancellationToken is canceled, Stream.ReadAsync is expected to throw OperationCanceledException rather than returning 0 (which signals EOF to callers).
        ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny, cancellationToken);
        if (result.Result != IOResult.Ok)
        {
            _canRead = false;
            return 0;
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/libp2p/Libp2p.Core/Stream.cs Outdated
Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core/Extensions/IChannelExtensions.cs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

src/libp2p/Libp2p.Core/Stream.cs:89

  • The overridden ReadAsync overloads bypass Stream's base disposed checks. After Dispose(), ReadAsync should throw ObjectDisposedException, but these methods currently call into _chan even when _disposed is true. Add a _disposed check (and throw ObjectDisposedException) at the beginning of the public ReadAsync overload(s).
    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        ArgumentNullException.ThrowIfNull(buffer);

        Memory<byte> target = buffer.AsMemory(offset, count);
        if (target.IsEmpty)
        {
            return cancellationToken.IsCancellationRequested
                ? Task.FromCanceled<int>(cancellationToken)

Comment thread src/libp2p/Libp2p.Core/Stream.cs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (4)

src/libp2p/Libp2p.Core/Stream.cs:28

  • Position getter returns 0 even though CanSeek is false. This is inconsistent with typical Stream semantics and can mislead consumers that query Position; consider throwing NotSupportedException from the getter as well (and ObjectDisposedException if disposed).
    public override long Position
    {
        get => 0;
        set => throw new NotSupportedException();
    }

src/libp2p/Libp2p.Core/Stream.cs:46

  • Synchronous Read blocks on an async operation via GetAwaiter().GetResult() without ConfigureAwait(false). If the underlying IChannel.ReadAsync captures a SynchronizationContext (e.g., UI/ASP.NET legacy), this pattern can deadlock; consider using ConfigureAwait(false) on the ValueTask before blocking (or avoid sync-over-async altogether).
        if (buffer.IsEmpty) return 0;

        ReadResult result = _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny).GetAwaiter().GetResult();
        if (result.Result != IOResult.Ok)

src/libp2p/Libp2p.Core/Stream.cs:66

  • Synchronous Write blocks on IChannel.WriteAsync(...).GetAwaiter().GetResult() without ConfigureAwait(false), which can deadlock if the async implementation captures a SynchronizationContext. Consider applying ConfigureAwait(false) before blocking (or restructuring to avoid sync-over-async).
        ThrowIfDisposed();

        if (_chan.WriteAsync(new ReadOnlySequence<byte>(source)).GetAwaiter().GetResult() != IOResult.Ok)
        {
            _canWrite = false;
        }

src/libp2p/Libp2p.Core/Stream.cs:32

  • Flush() is currently a no-op even when the stream is disposed. Most Stream implementations throw ObjectDisposedException after disposal; consider calling ThrowIfDisposed() inside Flush to keep disposal behavior consistent with Read/Write/CanRead/CanWrite.
    public override void Flush() { }

    public override int Read(byte[] buffer, int offset, int count)

Comment thread src/libp2p/Libp2p.Core/Stream.cs Outdated
Comment thread src/libp2p/Libp2p.Core/Extensions/IChannelExtensions.cs
@flcl42 flcl42 force-pushed the feature/Read-write-stream-wrapper-for-channel branch from dcaf261 to fc5e9c4 Compare May 22, 2026 09:42
@flcl42 flcl42 requested a review from Copilot May 22, 2026 09:48
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core/Stream.cs
Comment thread src/libp2p/Libp2p.Core.Tests/ChannelTests.cs
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

src/libp2p/Libp2p.Core/Stream.cs:217

  • Dispose(bool) triggers _chan.CloseAsync() but discards the returned ValueTask. This makes disposal non-deterministic (callers may assume the channel is closed when Dispose returns) and any exceptions from CloseAsync can go unobserved. Consider overriding DisposeAsync to await CloseAsync, and/or synchronously waiting for CloseAsync in Dispose (with careful deadlock avoidance), so closing is reliably completed as part of disposal.
    protected override void Dispose(bool disposing)
    {
        if (!_disposed)
        {
            if (disposing)
            {
                _ = _chan.CloseAsync();
            }
            _disposed = true;
        }

Comment thread src/libp2p/Libp2p.Core/Stream.cs
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

src/libp2p/Libp2p.Core/Stream.cs:216

  • ChannelStream closes the underlying IChannel in Dispose(bool) via a fire-and-forget CloseAsync call. Since Stream supports async disposal, consider overriding DisposeAsync() to await _chan.CloseAsync (and set _disposed afterwards) so callers using await using/DisposeAsync can reliably observe the channel being closed before disposal completes.
    protected override void Dispose(bool disposing)
    {
        if (!_disposed)
        {
            if (disposing)
            {
                _ = _chan.CloseAsync();
            }
            _disposed = true;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants