feat: extends IChannel with AsStream#134
Conversation
|
Refers #127 |
|
Hello @lucafabbri ! The code looks great, just few notes: Could you check |
1410531 to
cb79135
Compare
There was a problem hiding this comment.
Pull request overview
Adds a Stream adapter for IChannel to improve interoperability with APIs that consume System.IO.Stream, and includes basic test coverage for read/write behavior (including the newer Memory<T> overloads).
Changes:
- Added
IChannel.AsStream()extension returning aChannelStream. - Updated
ChannelStreamto implementReadAsync/WriteAsyncMemory<T>overloads and to honor offset/count when copying read data. - Added NUnit tests validating round-trip reads/writes and zero-length read/write behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/libp2p/Libp2p.Core/Stream.cs |
Improves ChannelStream async overloads and buffer handling for stream-based interop. |
src/libp2p/Libp2p.Core/Extensions/IChannelExtensions.cs |
Introduces AsStream() extension method on IChannel. |
src/libp2p/Libp2p.Core.Tests/ChannelTests.cs |
Adds tests for the new AsStream() adapter and basic read/write semantics. |
Comments suppressed due to low confidence (2)
src/libp2p/Libp2p.Core/Stream.cs:96
ReadAsync(Memory<byte>, ...)returns early for empty buffers only when_canReadis true. A zero-length read should return 0 immediately regardless of stream state; otherwise this can end up calling_chan.ReadAsync(0, WaitAny, ...)and blocking (perChannel.ReaderWriter.ReadAsyncbehavior forWaitAny+ length 0).
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.IsEmpty && _canRead) return ValueTask.FromResult(0);
return ReadAsyncCore(buffer, cancellationToken);
}
src/libp2p/Libp2p.Core/Stream.cs:105
ReadAsyncCoretreats any non-Okresult (includingIOResult.Cancelled) as end-of-stream by setting_canRead = falseand returning 0. If thecancellationTokenis canceled,Stream.ReadAsyncis expected to throwOperationCanceledExceptionrather than returning 0 (which signals EOF to callers).
ReadResult result = await _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny, cancellationToken);
if (result.Result != IOResult.Ok)
{
_canRead = false;
return 0;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
src/libp2p/Libp2p.Core/Stream.cs:89
- The overridden ReadAsync overloads bypass Stream's base disposed checks. After Dispose(), ReadAsync should throw ObjectDisposedException, but these methods currently call into _chan even when _disposed is true. Add a _disposed check (and throw ObjectDisposedException) at the beginning of the public ReadAsync overload(s).
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(buffer);
Memory<byte> target = buffer.AsMemory(offset, count);
if (target.IsEmpty)
{
return cancellationToken.IsCancellationRequested
? Task.FromCanceled<int>(cancellationToken)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (4)
src/libp2p/Libp2p.Core/Stream.cs:28
Positiongetter returns0even thoughCanSeekisfalse. This is inconsistent with typicalStreamsemantics and can mislead consumers that queryPosition; consider throwingNotSupportedExceptionfrom the getter as well (andObjectDisposedExceptionif disposed).
public override long Position
{
get => 0;
set => throw new NotSupportedException();
}
src/libp2p/Libp2p.Core/Stream.cs:46
- Synchronous
Readblocks on an async operation viaGetAwaiter().GetResult()withoutConfigureAwait(false). If the underlyingIChannel.ReadAsynccaptures aSynchronizationContext(e.g., UI/ASP.NET legacy), this pattern can deadlock; consider usingConfigureAwait(false)on theValueTaskbefore blocking (or avoid sync-over-async altogether).
if (buffer.IsEmpty) return 0;
ReadResult result = _chan.ReadAsync(buffer.Length, ReadBlockingMode.WaitAny).GetAwaiter().GetResult();
if (result.Result != IOResult.Ok)
src/libp2p/Libp2p.Core/Stream.cs:66
- Synchronous
Writeblocks onIChannel.WriteAsync(...).GetAwaiter().GetResult()withoutConfigureAwait(false), which can deadlock if the async implementation captures aSynchronizationContext. Consider applyingConfigureAwait(false)before blocking (or restructuring to avoid sync-over-async).
ThrowIfDisposed();
if (_chan.WriteAsync(new ReadOnlySequence<byte>(source)).GetAwaiter().GetResult() != IOResult.Ok)
{
_canWrite = false;
}
src/libp2p/Libp2p.Core/Stream.cs:32
Flush()is currently a no-op even when the stream is disposed. MostStreamimplementations throwObjectDisposedExceptionafter disposal; consider callingThrowIfDisposed()insideFlushto keep disposal behavior consistent withRead/Write/CanRead/CanWrite.
public override void Flush() { }
public override int Read(byte[] buffer, int offset, int count)
dcaf261 to
fc5e9c4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
src/libp2p/Libp2p.Core/Stream.cs:217
- Dispose(bool) triggers _chan.CloseAsync() but discards the returned ValueTask. This makes disposal non-deterministic (callers may assume the channel is closed when Dispose returns) and any exceptions from CloseAsync can go unobserved. Consider overriding DisposeAsync to await CloseAsync, and/or synchronously waiting for CloseAsync in Dispose (with careful deadlock avoidance), so closing is reliably completed as part of disposal.
protected override void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_ = _chan.CloseAsync();
}
_disposed = true;
}
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
src/libp2p/Libp2p.Core/Stream.cs:216
- ChannelStream closes the underlying IChannel in Dispose(bool) via a fire-and-forget CloseAsync call. Since Stream supports async disposal, consider overriding DisposeAsync() to await _chan.CloseAsync (and set _disposed afterwards) so callers using
await using/DisposeAsync can reliably observe the channel being closed before disposal completes.
protected override void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_ = _chan.CloseAsync();
}
_disposed = true;
It implements a ChannelStream that wraps an IChannel.
The implementation does not touch the IChannel interface, even if some Stream abstraction can't be implemented without exposing some more information out of the IChannel interface.