Skip to content

Commit c397f34

Browse files
committed
chore: wip 2
Signed-off-by: Nugraha <[email protected]>
1 parent 6f8ad54 commit c397f34

File tree

10 files changed

+61
-8
lines changed

10 files changed

+61
-8
lines changed

client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
6565
EnableGet: config.EnableGet,
6666
GetURLMaxBytes: config.GetURLMaxBytes,
6767
GetUseFallback: config.GetUseFallback,
68+
69+
Experimental: config.Experimental,
6870
},
6971
)
7072
if protocolErr != nil {
@@ -213,6 +215,8 @@ type clientConfig struct {
213215
GetURLMaxBytes int
214216
GetUseFallback bool
215217
IdempotencyLevel IdempotencyLevel
218+
219+
Experimental ExperimentalFeatures
216220
}
217221

218222
func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) {

connect_ext_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2179,16 +2179,20 @@ func TestBidiOverHTTP1(t *testing.T) {
21792179
client := pingv1connect.NewPingServiceClient(
21802180
&http.Client{Transport: server.TransportHTTP1()},
21812181
server.URL(),
2182+
// connect.WithExperimental(connect.ExperimentalFeatures{
2183+
// AllowBidiStreamOverHTTP11: true,
2184+
// }),
21822185
)
21832186
stream := client.CumSum(context.Background())
21842187
// Stream creates an async request, can error on Send or Receive.
21852188
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
21862189
assert.ErrorIs(t, err, io.EOF)
21872190
}
21882191
_, err := stream.Receive()
2189-
assert.NotNil(t, err)
21902192
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
2193+
assert.NotNil(t, err)
21912194
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
2195+
21922196
assert.Nil(t, stream.CloseRequest())
21932197
assert.Nil(t, stream.CloseResponse())
21942198
}

duplex_http_call.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type duplexHTTPCall struct {
5050
responseReady chan struct{}
5151
response *http.Response
5252
responseErr error
53+
54+
allowBidiStreamOverHTTP11 bool
5355
}
5456

5557
func newDuplexHTTPCall(
@@ -58,6 +60,7 @@ func newDuplexHTTPCall(
5860
url *url.URL,
5961
spec Spec,
6062
header http.Header,
63+
features ExperimentalFeatures,
6164
) *duplexHTTPCall {
6265
// ensure we make a copy of the url before we pass along to the
6366
// Request. This ensures if a transport out of our control wants
@@ -87,6 +90,8 @@ func newDuplexHTTPCall(
8790
streamType: spec.StreamType,
8891
request: request,
8992
responseReady: make(chan struct{}),
93+
94+
allowBidiStreamOverHTTP11: features.AllowBidiStreamOverHTTP11,
9095
}
9196
}
9297

@@ -326,7 +331,7 @@ func (d *duplexHTTPCall) makeRequest() {
326331
_ = d.CloseWrite()
327332
return
328333
}
329-
if DisallowBidiStreamingHttp11 {
334+
if !d.allowBidiStreamOverHTTP11 {
330335
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
331336
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
332337
// rather than returning a more cryptic error later on.

duplex_http_call_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func TestHTTPCallGetBody(t *testing.T) {
5454
serverURL,
5555
Spec{StreamType: StreamTypeUnary},
5656
http.Header{},
57+
ExperimentalFeatures{},
5758
)
5859
getBodyCalled := false
5960
call.onRequestSend = func(*http.Request) {

features.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package connect
22

3-
var (
4-
DisallowBidiStreamingHttp11 = true
5-
)
3+
type ExperimentalFeatures struct {
4+
AllowBidiStreamOverHTTP11 bool
5+
}

handler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Handler struct {
3131
protocolHandlers map[string][]protocolHandler // Method to protocol handlers
3232
allowMethod string // Allow header
3333
acceptPost string // Accept-Post header
34+
experimental ExperimentalFeatures
3435
}
3536

3637
// NewUnaryHandler constructs a [Handler] for a request-response procedure.
@@ -82,6 +83,8 @@ func NewUnaryHandler[Req, Res any](
8283
protocolHandlers: mappedMethodHandlers(protocolHandlers),
8384
allowMethod: sortedAllowMethodValue(protocolHandlers),
8485
acceptPost: sortedAcceptPostValue(protocolHandlers),
86+
87+
experimental: config.Experimental,
8588
}
8689
}
8790

@@ -165,7 +168,7 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
165168
if isBidi && request.ProtoMajor < 2 {
166169
// Check if we allow bidi stream over HTTP/1.1, enable full-duplex support
167170
// and if fail, we fallback to the default behaviour.
168-
if !DisallowBidiStreamingHttp11 {
171+
if h.experimental.AllowBidiStreamOverHTTP11 {
169172
responseController := http.NewResponseController(responseWriter)
170173
if err := responseController.EnableFullDuplex(); err == nil {
171174
goto Pass
@@ -262,6 +265,8 @@ type handlerConfig struct {
262265
ReadMaxBytes int
263266
SendMaxBytes int
264267
StreamType StreamType
268+
269+
Experimental ExperimentalFeatures
265270
}
266271

267272
func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig {
@@ -314,6 +319,7 @@ func (c *handlerConfig) newProtocolHandlers() []protocolHandler {
314319
SendMaxBytes: c.SendMaxBytes,
315320
RequireConnectProtocolHeader: c.RequireConnectProtocolHeader,
316321
IdempotencyLevel: c.IdempotencyLevel,
322+
Experimental: c.Experimental,
317323
}))
318324
}
319325
return handlers
@@ -333,5 +339,7 @@ func newStreamHandler(
333339
protocolHandlers: mappedMethodHandlers(protocolHandlers),
334340
allowMethod: sortedAllowMethodValue(protocolHandlers),
335341
acceptPost: sortedAcceptPostValue(protocolHandlers),
342+
343+
experimental: config.Experimental,
336344
}
337345
}

option.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ func WithCodec(codec Codec) Option {
230230
return &codecOption{Codec: codec}
231231
}
232232

233+
func WithExperimental(features ExperimentalFeatures) Option {
234+
return &experimentalOption{features}
235+
}
236+
233237
// WithCompressMinBytes sets a minimum size threshold for compression:
234238
// regardless of compressor configuration, messages smaller than the configured
235239
// minimum are sent uncompressed.
@@ -401,6 +405,18 @@ func (o *clientOptionsOption) applyToClient(config *clientConfig) {
401405
}
402406
}
403407

408+
type experimentalOption struct {
409+
features ExperimentalFeatures
410+
}
411+
412+
func (o *experimentalOption) applyToClient(config *clientConfig) {
413+
config.Experimental = o.features
414+
}
415+
416+
func (o *experimentalOption) applyToHandler(config *handlerConfig) {
417+
config.Experimental = o.features
418+
}
419+
404420
type codecOption struct {
405421
Codec Codec
406422
}

protocol.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type protocolHandlerParams struct {
8383
SendMaxBytes int
8484
RequireConnectProtocolHeader bool
8585
IdempotencyLevel IdempotencyLevel
86+
87+
Experimental ExperimentalFeatures
8688
}
8789

8890
// Handler is the server side of a protocol. HTTP handlers typically support
@@ -132,6 +134,8 @@ type protocolClientParams struct {
132134
// The gRPC family of protocols always needs access to a Protobuf codec to
133135
// marshal and unmarshal errors.
134136
Protobuf Codec
137+
138+
Experimental ExperimentalFeatures
135139
}
136140

137141
// Client is the client side of a protocol. HTTP clients typically use a single

protocol_connect.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,14 @@ func (c *connectClient) NewConn(
360360
} // else effectively unbounded
361361
}
362362
}
363-
duplexCall := newDuplexHTTPCall(ctx, c.HTTPClient, c.URL, spec, header)
363+
duplexCall := newDuplexHTTPCall(
364+
ctx,
365+
c.HTTPClient,
366+
c.URL,
367+
spec,
368+
header,
369+
c.Experimental,
370+
)
364371
var conn streamingClientConn
365372
if spec.StreamType == StreamTypeUnary {
366373
unaryConn := &connectUnaryClientConn{

protocol_grpc.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func (g *grpcHandler) NewConn(
214214
},
215215
web: g.web,
216216
},
217+
experimental: g.Experimental,
217218
})
218219
if failed != nil {
219220
// Negotiation failed, so we can't establish a stream.
@@ -280,6 +281,7 @@ func (g *grpcClient) NewConn(
280281
g.URL,
281282
spec,
282283
header,
284+
g.Experimental,
283285
)
284286
conn := &grpcClientConn{
285287
spec: spec,
@@ -466,6 +468,8 @@ type grpcHandlerConn struct {
466468
wroteToBody bool
467469
request *http.Request
468470
unmarshaler grpcUnmarshaler
471+
472+
experimental ExperimentalFeatures
469473
}
470474

471475
func (hc *grpcHandlerConn) Spec() Spec {
@@ -515,7 +519,7 @@ func (hc *grpcHandlerConn) Close(err error) (retErr error) {
515519
// a well-intentioned client may just not expect the server to be returning
516520
// an error for a streaming RPC. Better to accept that we can't always reuse
517521
// TCP connections.
518-
if DisallowBidiStreamingHttp11 {
522+
if !hc.experimental.AllowBidiStreamOverHTTP11 {
519523
closeErr := hc.request.Body.Close()
520524
if retErr == nil {
521525
retErr = closeErr

0 commit comments

Comments
 (0)