Skip to content

Commit 4c00edb

Browse files
committed
feat: support for multi gRPC query clients serve with old binary
1 parent 0d6228e commit 4c00edb

File tree

6 files changed

+176
-7
lines changed

6 files changed

+176
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ This patch update also includes minor dependency bumps.
4444

4545
* (abci_utils) [#25008](https://github.com/cosmos/cosmos-sdk/pull/24861) add the ability to assign a custom signer extraction adapter in `DefaultProposalHandler`.
4646
* (x/tx) [#25539](https://github.com/cosmos/cosmos-sdk/pull/25539) Expose `NullSliceAsEmptyEncoder` as a public function and add `null_slice_as_empty` encoding option for protobuf annotations.
47+
* (gRPC) [#25565](https://github.com/cosmos/cosmos-sdk/pull/25565) Support for multi gRPC query clients serve with old binary.
4748

4849
## [v0.53.3](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.53.3) - 2025-07-08
4950

client/context.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Context struct {
3030
FromAddress sdk.AccAddress
3131
Client CometRPC
3232
GRPCClient *grpc.ClientConn
33+
GRPCConnProvider *GRPCConnProvider
3334
ChainID string
3435
Codec codec.Codec
3536
InterfaceRegistry codectypes.InterfaceRegistry
@@ -155,6 +156,22 @@ func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context {
155156
return ctx
156157
}
157158

159+
// WithGRPCConnProvider returns a copy of the context with an updated GRPCConnProvider.
160+
func (ctx Context) WithGRPCConnProvider(provider *GRPCConnProvider) Context {
161+
ctx.GRPCConnProvider = provider
162+
return ctx
163+
}
164+
165+
// GetGRPCConn returns the appropriate gRPC connection for the given height.
166+
// If GRPCConnProvider is set, it uses it to determine the connection.
167+
// Otherwise, it falls back to the default GRPCClient.
168+
func (ctx Context) GetGRPCConn(height int64) *grpc.ClientConn {
169+
if ctx.GRPCConnProvider != nil {
170+
return ctx.GRPCConnProvider.GetGRPCConn(height)
171+
}
172+
return ctx.GRPCClient
173+
}
174+
158175
// WithUseLedger returns a copy of the context with an updated UseLedger flag.
159176
func (ctx Context) WithUseLedger(useLedger bool) Context {
160177
ctx.UseLedger = useLedger

client/grpc_query.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,56 @@ import (
1616

1717
"github.com/cosmos/cosmos-sdk/codec"
1818
"github.com/cosmos/cosmos-sdk/codec/types"
19+
"github.com/cosmos/cosmos-sdk/server/config"
1920
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
2021
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
2122
"github.com/cosmos/cosmos-sdk/types/tx"
2223
)
2324

25+
// GRPCConn provides a method to get the appropriate gRPC connection based on block height.
26+
type GRPCConn interface {
27+
GetGRPCConn(height int64) *grpc.ClientConn
28+
}
29+
30+
// GRPCConnProvider manages gRPC connections with optional backup connections for historical queries.
31+
type GRPCConnProvider struct {
32+
// DefaultConn is the primary gRPC connection
33+
DefaultConn *grpc.ClientConn
34+
// BackupConns maps block ranges to backup gRPC connections for routing historical queries
35+
BackupConns config.BackupGRPCConnections
36+
}
37+
38+
// NewGRPCConnProvider creates a new GRPCConnProvider with the given connections.
39+
func NewGRPCConnProvider(defaultConn *grpc.ClientConn, backupConns config.BackupGRPCConnections) *GRPCConnProvider {
40+
if backupConns == nil {
41+
backupConns = make(config.BackupGRPCConnections)
42+
}
43+
return &GRPCConnProvider{
44+
DefaultConn: defaultConn,
45+
BackupConns: backupConns,
46+
}
47+
}
48+
49+
// GetGRPCConn returns the appropriate gRPC connection based on the block height.
50+
// For height <= 0 (latest block), it returns the default connection.
51+
// For positive heights, it checks if a backup connection exists for that height range.
52+
func (g *GRPCConnProvider) GetGRPCConn(height int64) *grpc.ClientConn {
53+
// height = 0 means latest block, use the default connection
54+
if height <= 0 {
55+
return g.DefaultConn
56+
}
57+
58+
// Check if there's a backup connection for this height
59+
for blockRange, conn := range g.BackupConns {
60+
if int64(blockRange[0]) <= height && int64(blockRange[1]) >= height {
61+
return conn
62+
}
63+
}
64+
65+
// Default to the primary connection if no backup matches
66+
return g.DefaultConn
67+
}
68+
2469
var _ gogogrpc.ClientConn = Context{}
2570

2671
// fallBackCodec is used by Context in case Codec is not set.
@@ -58,7 +103,11 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a
58103

59104
if ctx.GRPCClient != nil {
60105
// Case 2-1. Invoke grpc.
61-
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
106+
grpcConn := ctx.GRPCClient
107+
if ctx.GRPCConnProvider != nil {
108+
grpcConn = ctx.GRPCConnProvider.GetGRPCConn(ctx.Height)
109+
}
110+
return grpcConn.Invoke(grpcCtx, method, req, reply, opts...)
62111
}
63112

64113
// Case 2-2. Querying state via abci query.

server/config/config.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package config
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"math"
67

78
"github.com/spf13/viper"
9+
"google.golang.org/grpc"
810

911
pruningtypes "cosmossdk.io/store/pruning/types"
1012

@@ -124,6 +126,12 @@ type APIConfig struct {
124126
// Ref: https://github.com/cosmos/cosmos-sdk/issues/6420
125127
}
126128

129+
type BlockRange [2]int
130+
131+
// BackupGRPCConnections is a map of block ranges to gRPC client connections
132+
// used for routing requests to different backend nodes based on block height.
133+
type BackupGRPCConnections map[BlockRange]*grpc.ClientConn
134+
127135
// GRPCConfig defines configuration for the gRPC server.
128136
type GRPCConfig struct {
129137
// Enable defines if the gRPC server should be enabled.
@@ -142,6 +150,9 @@ type GRPCConfig struct {
142150

143151
// SkipCheckHeader defines if the gRPC server should bypass header checking.
144152
SkipCheckHeader bool `mapstructure:"skip-check-header"`
153+
154+
// BackupGRPCBlockAddressBlockRange maps block ranges to gRPC addresses for routing historical queries.
155+
BackupGRPCBlockAddressBlockRange map[BlockRange]string `mapstructure:"-"`
145156
}
146157

147158
// GRPCWebConfig defines configuration for the gRPC-web server.
@@ -278,6 +289,26 @@ func GetConfig(v *viper.Viper) (Config, error) {
278289
if err := v.Unmarshal(conf); err != nil {
279290
return Config{}, fmt.Errorf("error extracting app config: %w", err)
280291
}
292+
raw := v.GetString("grpc.backup-grpc-address-block-range")
293+
if len(raw) > 0 {
294+
data := make(map[string]BlockRange)
295+
if err := json.Unmarshal([]byte(raw), &data); err != nil {
296+
return Config{}, fmt.Errorf("failed to parse backup-grpc-address-block-range as JSON: %w (value: %s)", err, raw)
297+
}
298+
backupGRPCBlockAddressBlockRange := make(map[BlockRange]string, len(data))
299+
for address, blockRange := range data {
300+
if blockRange[0] < 0 || blockRange[1] < 0 {
301+
return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: block numbers cannot be negative",
302+
blockRange[0], blockRange[1], address)
303+
}
304+
if blockRange[0] > blockRange[1] {
305+
return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: start block must be <= end block",
306+
blockRange[0], blockRange[1], address)
307+
}
308+
backupGRPCBlockAddressBlockRange[blockRange] = address
309+
}
310+
conf.GRPC.BackupGRPCBlockAddressBlockRange = backupGRPCBlockAddressBlockRange
311+
}
281312
return *conf, nil
282313
}
283314

server/config/toml.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,13 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}"
182182
# The default value is math.MaxInt32.
183183
max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}"
184184
185+
# Backup gRPC addresses with block ranges for historical query routing.
186+
# This should be a JSON string mapping gRPC addresses to block ranges.
187+
# Format: '{"address1": [start_block, end_block], "address2": [start_block, end_block]}'
188+
# Example: '{"0.0.0.0:26113": [0, 1000], "0.0.0.0:26114": [1001, 2000]}'
189+
# Leave empty to disable backup gRPC routing.
190+
backup-grpc-address-block-range = "{{ printf "{" }}{{ range $k, $v := .GRPC.BackupGRPCBlockAddressBlockRange }}\"{{ $v }}\": [{{index $k 0 }}, {{ index $k 1}}]{{ end }}{{ printf "}" }}"
191+
185192
###############################################################################
186193
### gRPC Web Configuration ###
187194
###############################################################################

server/start.go

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
"google.golang.org/grpc"
3636
"google.golang.org/grpc/credentials/insecure"
3737

38+
errorsmod "cosmossdk.io/errors"
39+
"cosmossdk.io/log"
3840
pruningtypes "cosmossdk.io/store/pruning/types"
3941

4042
"github.com/cosmos/cosmos-sdk/client"
@@ -96,11 +98,12 @@ const (
9698

9799
// gRPC-related flags
98100

99-
flagGRPCOnly = "grpc-only"
100-
flagGRPCEnable = "grpc.enable"
101-
flagGRPCAddress = "grpc.address"
102-
flagGRPCWebEnable = "grpc-web.enable"
103-
flagGRPCSkipCheckHeader = "grpc.skip-check-header"
101+
flagGRPCOnly = "grpc-only"
102+
flagGRPCEnable = "grpc.enable"
103+
flagGRPCAddress = "grpc.address"
104+
flagGRPCWebEnable = "grpc-web.enable"
105+
flagGRPCSkipCheckHeader = "grpc.skip-check-header"
106+
flagBackupGRPCBlockAddressBlockRange = "grpc.backup-grpc-address-block-range"
104107

105108
// mempool flags
106109

@@ -451,6 +454,53 @@ func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func
451454
return traceWriter, cleanup, nil
452455
}
453456

457+
func parseGrpcAddress(address string) (string, error) {
458+
host, port, err := net.SplitHostPort(address)
459+
if err != nil {
460+
return "", errorsmod.Wrapf(err, "invalid grpc address %s", address)
461+
}
462+
return fmt.Sprintf("%s:%s", host, port), nil
463+
}
464+
465+
// SetupBackupGRPCConnections creates backup gRPC connections based on the configuration
466+
// and returns a client context with the GRPCConnProvider configured.
467+
func SetupBackupGRPCConnections(
468+
clientCtx client.Context,
469+
grpcClient *grpc.ClientConn,
470+
backupAddresses map[serverconfig.BlockRange]string,
471+
maxRecvMsgSize, maxSendMsgSize int,
472+
logger log.Logger,
473+
) (client.Context, error) {
474+
if len(backupAddresses) == 0 {
475+
return clientCtx, nil
476+
}
477+
478+
backupConns := make(serverconfig.BackupGRPCConnections)
479+
for blockRange, address := range backupAddresses {
480+
grpcAddr, err := parseGrpcAddress(address)
481+
if err != nil {
482+
return clientCtx, err
483+
}
484+
conn, err := grpc.NewClient(
485+
grpcAddr,
486+
grpc.WithTransportCredentials(insecure.NewCredentials()),
487+
grpc.WithDefaultCallOptions(
488+
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
489+
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
490+
grpc.MaxCallSendMsgSize(maxSendMsgSize),
491+
),
492+
)
493+
if err != nil {
494+
return clientCtx, err
495+
}
496+
backupConns[blockRange] = conn
497+
}
498+
499+
clientCtx = clientCtx.WithGRPCConnProvider(client.NewGRPCConnProvider(grpcClient, backupConns))
500+
logger.Info("backup gRPC connections configured", "count", len(backupConns))
501+
return clientCtx, nil
502+
}
503+
454504
func startGrpcServer(
455505
ctx context.Context,
456506
g *errgroup.Group,
@@ -479,7 +529,7 @@ func startGrpcServer(
479529
}
480530

481531
// if gRPC is enabled, configure gRPC client for gRPC gateway
482-
grpcClient, err := grpc.Dial( //nolint: staticcheck // ignore this line for this linter
532+
grpcClient, err := grpc.NewClient(
483533
config.Address,
484534
grpc.WithTransportCredentials(insecure.NewCredentials()),
485535
grpc.WithDefaultCallOptions(
@@ -495,6 +545,19 @@ func startGrpcServer(
495545
clientCtx = clientCtx.WithGRPCClient(grpcClient)
496546
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", config.Address)
497547

548+
// Setup backup gRPC connections if configured
549+
clientCtx, err = SetupBackupGRPCConnections(
550+
clientCtx,
551+
grpcClient,
552+
config.BackupGRPCBlockAddressBlockRange,
553+
maxRecvMsgSize,
554+
maxSendMsgSize,
555+
svrCtx.Logger,
556+
)
557+
if err != nil {
558+
return nil, clientCtx, err
559+
}
560+
498561
grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config)
499562
if err != nil {
500563
return nil, clientCtx, err
@@ -994,6 +1057,7 @@ func addStartNodeFlags(cmd *cobra.Command, opts StartCmdOptions) {
9941057
cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled")
9951058
cmd.Flags().String(flagGRPCAddress, serverconfig.DefaultGRPCAddress, "the gRPC server address to listen on")
9961059
cmd.Flags().Bool(flagGRPCWebEnable, true, "Define if the gRPC-Web server should be enabled. (Note: gRPC must also be enabled)")
1060+
cmd.Flags().String(flagBackupGRPCBlockAddressBlockRange, "", "Define if backup grpc and block range is available")
9971061
cmd.Flags().Uint64(FlagStateSyncSnapshotInterval, 0, "State sync snapshot interval")
9981062
cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep")
9991063
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")

0 commit comments

Comments
 (0)