Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ see the [quick start](#quick-start).
- [Unit Tests](#unit-tests)
- [Compliance Tests](#compliance-tests)
- [Fuzzing Tests](#fuzzing-tests)
- [Generating Mock Files Using Mockery](./mocks/README.md)
- [Connection Methods](#connection-methods)
- [Plain TCP Connection](#plain-tcp-connection)
- [Secured TCP Connection (TLS)](#secured-tcp-connection-tls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ func NewBigtableClient(clients *types.BigtableClientManager, logger *zap.Logger,
func (btc *BigtableAdapter) Execute(ctx context.Context, query types.IExecutableQuery) (message.Message, error) {
switch q := query.(type) {
case *types.BoundDeleteQuery:
return btc.DeleteRow(ctx, q)
return btc.deleteRow(ctx, q)
case *types.BigtableWriteMutation:
return btc.mutateRow(ctx, q)
case *types.ExecutableSelectQuery:
return btc.ExecutePreparedStatement(ctx, q)
return btc.executePreparedStatement(ctx, q)
case *types.CreateTableStatementMap:
return btc.schemaManager.CreateTable(ctx, q)
case *types.AlterTableStatementMap:
return btc.schemaManager.AlterTable(ctx, q)
case *types.TruncateTableStatementMap:
err := btc.DropAllRows(ctx, q)
err := btc.dropAllRows(ctx, q)
return emptyRowsResult(), err
case *types.DropTableQuery:
return btc.schemaManager.DropTable(ctx, q)
Expand All @@ -91,8 +91,6 @@ func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.Bigtable
otelgo.AddAnnotation(ctx, applyingBigtableMutation)
mut := bigtable.NewMutation()

btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(input.RowKey()))))

client, err := btc.clients.GetClient(input.Keyspace())
if err != nil {
return nil, err
Expand Down Expand Up @@ -185,13 +183,13 @@ func (btc *BigtableAdapter) buildMutation(ctx context.Context, table *bigtable.T
return nil
}

func (btc *BigtableAdapter) DropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
func (btc *BigtableAdapter) dropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
_, err := btc.schemaManager.Schemas().GetTableSchema(data.Keyspace(), data.Table())
if err != nil {
return err
}

// performance optimization because DropAllRows can be slow
// performance optimization because dropAllRows can be slow
hasRows, err := btc.hasAnyRows(ctx, data.Keyspace(), data.Table())
if err != nil {
return err
Expand Down Expand Up @@ -263,19 +261,19 @@ func (btc *BigtableAdapter) InsertRow(ctx context.Context, input *types.Bigtable
return btc.mutateRow(ctx, input)
}

// UpdateRow - Updates a row in the specified bigtable table.
// updateRow - Updates a row in the specified bigtable table.
//
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - data: PreparedUpdateQuery object containing the table, row key, columns, values, and DeleteColumnFamilies.
//
// Returns:
// - error: Error if the update fails.
func (btc *BigtableAdapter) UpdateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
func (btc *BigtableAdapter) updateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
return btc.mutateRow(ctx, input)
}

func (btc *BigtableAdapter) DeleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
func (btc *BigtableAdapter) deleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
otelgo.AddAnnotation(ctx, applyingDeleteMutation)
client, err := btc.clients.GetClient(deleteQueryData.Keyspace())
if err != nil {
Expand Down Expand Up @@ -385,9 +383,10 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
}, err
}
default:
err := fmt.Errorf("unhandled bulk mutation type %T", md)
return BulkOperationResponse{
FailedRows: fmt.Sprintf("All Rows are failed because: unsupported bulk operation: %T", v),
}, fmt.Errorf("unhandled bulk mutation type %T", md)
}, err
}
}
// create mutations from mutation data
Expand Down Expand Up @@ -510,8 +509,8 @@ func (btc *BigtableAdapter) PrepareStatement(ctx context.Context, query types.IP
return nil, nil
}

selectQuery, ok := query.(*types.PreparedSelectQuery)
if !ok {
selectQuery, isType := query.(*types.PreparedSelectQuery)
if !isType {
// only select queries can be prepared in Bigtable at this time
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/constants"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/datastax/proxycore"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
Expand All @@ -32,7 +33,7 @@ import (
"time"
)

// ExecutePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// executePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - query: rh.QueryMetadata containing the query and parameters.
Expand All @@ -42,7 +43,7 @@ import (
// - *message.RowsResult: The result of the select statement.
// - time.Duration: The total elapsed time for the operation.
// - error: Error if the statement preparation or execution fails.
func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
func (btc *BigtableAdapter) executePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
if query.CachedBTPrepare == nil {
return nil, fmt.Errorf("cannot execute select query because prepared bigtable query is nil")
}
Expand All @@ -59,14 +60,19 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return nil, fmt.Errorf("failed to bind parameters: %w", err)
}

table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

var processingErr error
var rows []types.GoRow
executeErr := boundStmt.Execute(ctx, func(resultRow bigtable.ResultRow) bool {
r, convertErr := btc.convertResultRow(resultRow, query) // Call the implemented helper
r, convertErr := btc.convertResultRow(resultRow, query, table)
if convertErr != nil {
btc.Logger.Error("Failed to convert result row", zap.Error(convertErr), zap.String("btql", query.TranslatedQuery))
processingErr = convertErr // Capture the error
return false // Stop execution
processingErr = convertErr
return false // Stop execution
}
rows = append(rows, r)
return true // Continue processing
Expand All @@ -82,12 +88,7 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return responsehandler.BuildRowsResultResponse(query, rows, query.ProtocolVersion)
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery) (types.GoRow, error) {
table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery, table *metadata.TableSchema) (types.GoRow, error) {
result := make(types.GoRow)
for i, colMeta := range resultRow.Metadata.Columns {
var val any
Expand Down Expand Up @@ -140,10 +141,6 @@ func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query
return nil, fmt.Errorf("result already set for column `%s`", key)
}

if key == "list_text" {
btc.Logger.Log(zap.InfoLevel, "list_text", zap.Any("value", val))
}

goValue, err := rowValueToGoValue(val, expectedType)
if err != nil {
return nil, fmt.Errorf("failed to convert result for '%s': %w", key, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/bigtable"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestMain(m *testing.M) {
fmt.Printf("failed to initialize md store: %v", err)
os.Exit(1)
}
btc = NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, mdStore)
btc = NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, mdStore, &otelgo.OpenTelemetry{Config: &otelgo.OTelConfig{OTELEnabled: false}})

_, err = mdStore.CreateTable(ctx, types.NewCreateTableStatementMap(keyspace, tableName, "ignored", false, []types.CreateColumn{
{
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestInsertRow(t *testing.T) {
mdStore := schemaMapping.NewMetadataStore(zap.NewNop(), bts.Clients(), &bigtableConfig)
mdStore.Schemas().AddTables(mockdata.GetSchemaMappingConfig().Tables())

localBtc := NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, mdStore)
localBtc := NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, mdStore, &otelgo.OpenTelemetry{Config: &otelgo.OTelConfig{OTELEnabled: false}})

tests := []struct {
name string
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestDeleteRow(t *testing.T) {
require.NoError(t, err)

deleteQuery := types.NewBoundDeleteQuery(keyspace, tableName, "", rowKey, false, nil)
_, err = btc.DeleteRow(t.Context(), deleteQuery)
_, err = btc.deleteRow(t.Context(), deleteQuery)
require.NoError(t, err)

// Verify deletion
Expand Down Expand Up @@ -236,7 +237,7 @@ func TestMutateRowDeleteColumnFamily(t *testing.T) {
// Delete cf2
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteCellsOp("tags"))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand All @@ -259,7 +260,7 @@ func TestMutateRowDeleteQualifiers(t *testing.T) {
// Delete col1
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteColumnOp(types.BigtableColumn{Family: "cf1", Column: "col1"}))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand Down Expand Up @@ -289,7 +290,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Update the row when it exists
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key1)
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err := btc.UpdateRow(t.Context(), updateData)
res, err := btc.updateRow(t.Context(), updateData)
require.NoError(t, err)
assert.True(t, wasApplied(res))

Expand All @@ -305,7 +306,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Attempt to update a non-existent row
updateDataNonExistent := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key2)
updateDataNonExistent.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err = btc.UpdateRow(t.Context(), updateDataNonExistent)
res, err = btc.updateRow(t.Context(), updateDataNonExistent)
require.NoError(t, err)
assert.False(t, wasApplied(res))

Expand Down Expand Up @@ -346,11 +347,11 @@ func TestMutateRowIfNotExists(t *testing.T) {

func TestMutateRowInvalidKeyspace(t *testing.T) {
localMdStore := schemaMapping.NewMetadataStore(zap.NewNop(), bts.Clients(), &bigtableConfig)
localBtc := NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, localMdStore)
localBtc := NewBigtableClient(bts.Clients(), zap.NewNop(), &bigtableConfig, localMdStore, &otelgo.OpenTelemetry{Config: &otelgo.OTelConfig{OTELEnabled: false}})

updateData := types.NewBigtableWriteMutation("invalid-keyspace", "any-table", "", types.IfSpec{}, types.QueryTypeUpdate, "row1")
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("value")))
_, err := localBtc.UpdateRow(t.Context(), updateData)
_, err := localBtc.updateRow(t.Context(), updateData)
require.Error(t, err)
assert.Contains(t, err.Error(), "bigtable client not found for keyspace 'invalid-keyspace'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/mem_table"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel"
"github.com/datastax/go-cassandra-native-protocol/message"
"go.uber.org/zap"
"strings"
Expand All @@ -20,9 +21,10 @@ type IQueryExecutor interface {
type QueryExecutorManager struct {
logger *zap.Logger
executors []IQueryExecutor
otelInst *otelgo.OpenTelemetry
}

func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata, bt *bigtableModule.BigtableAdapter, systemTables *mem_table.InMemEngine) *QueryExecutorManager {
func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata, bt *bigtableModule.BigtableAdapter, systemTables *mem_table.InMemEngine, otelInst *otelgo.OpenTelemetry) *QueryExecutorManager {
return &QueryExecutorManager{
logger: logger,
executors: []IQueryExecutor{
Expand All @@ -31,15 +33,28 @@ func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata
newSelectSystemTableExecutor(s, systemTables),
newBigtableExecutor(bt),
},
otelInst: otelInst,
}
}

func (m *QueryExecutorManager) Execute(ctx context.Context, client types.ICassandraClient, q types.IExecutableQuery) (message.Message, error) {
func (m *QueryExecutorManager) getExecutor(q types.IExecutableQuery) (IQueryExecutor, error) {
for _, e := range m.executors {
if e.CanRun(q) {
m.logger.Debug("executing query", zap.String("cql", q.CqlQuery()), zap.String("btql", q.BigtableQuery()))
return e.Execute(ctx, client, q)
return e, nil
}
}
return nil, fmt.Errorf("no executor found for query %s on keyspace %s", strings.ToUpper(q.QueryType().String()), q.Keyspace())
}

func (m *QueryExecutorManager) Execute(ctx context.Context, client types.ICassandraClient, q types.IExecutableQuery) (message.Message, error) {
executor, err := m.getExecutor(q)
if err != nil {
return nil, err
}

otelCtx, childSpan := m.otelInst.StartSpan(ctx, "execute", nil)
defer childSpan.End()
msg, err := executor.Execute(otelCtx, client, q)
childSpan.RecordError(err)
return msg, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type OtelConfig struct {
Endpoint string
}
Traces struct {
ProjectId string
Endpoint string
SamplingRatio float64
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (q QueryType) IsDDLType() bool {

type IExecutableQuery interface {
Keyspace() Keyspace
Table() TableName // empty string if no table involved e.g. "USE keyspace;"
QueryType() QueryType
AsBulkMutation() (IBigtableMutation, bool)
CqlQuery() string
Expand Down Expand Up @@ -229,8 +230,9 @@ type RawQuery struct {
cql string
qt QueryType
sessionKeyspace Keyspace
parser *parser.ProxyCqlParser
startTime time.Time
// warning: parsers are pooled for performance reasons. this will be released back into the pool and set to nil after translation
parser *parser.ProxyCqlParser
startTime time.Time
}

func NewRawQuery(header *frame.Header, sessionKeyspace Keyspace, cql string, parser *parser.ProxyCqlParser, qt QueryType) *RawQuery {
Expand All @@ -248,6 +250,11 @@ func NewRawQueryWithTime(header *frame.Header, sessionKeyspace Keyspace, cql str
}
}

func (r *RawQuery) Release() {
r.parser.Release()
r.parser = nil
}

func (r *RawQuery) QueryType() QueryType {
return r.qt
}
Expand Down
Loading