Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,16 @@ func (s *Coordinator) FlushCollectionCompactionAndAttachedFunction(
return s.catalog.FlushCollectionCompactionAndAttachedFunction(ctx, flushCollectionCompaction, attachedFunctionID, runNonce, completionOffset)
}

func (s *Coordinator) FlushCollectionCompactionAndAttachedFunctionExtended(
ctx context.Context,
collectionCompactions []*model.FlushCollectionCompaction,
attachedFunctionID uuid.UUID,
runNonce uuid.UUID,
completionOffset int64,
) (*model.ExtendedFlushCollectionInfo, error) {
return s.catalog.FlushCollectionCompactionAndAttachedFunctionExtended(ctx, collectionCompactions, attachedFunctionID, runNonce, completionOffset)
}

func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
}
Expand Down
246 changes: 181 additions & 65 deletions go/pkg/sysdb/coordinator/create_task_test.go

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions go/pkg/sysdb/coordinator/heap_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (suite *HeapClientIntegrationTestSuite) TestAttachFunctionPushesScheduleToH
})
suite.NoError(err, "Should attached function successfully")
suite.NotNil(response)
suite.NotEmpty(response.Id, "Attached function ID should be returned")
suite.NotNil(response.AttachedFunction)
suite.NotEmpty(response.AttachedFunction.Id, "Attached function ID should be returned")

// Get updated heap summary
updatedSummary, err := suite.heapClient.Summary(ctx, &coordinatorpb.HeapSummaryRequest{})
Expand Down Expand Up @@ -263,7 +264,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskRecovery_HybridAppro
return
}
suite.NotNil(taskResp)
originalTaskID := taskResp.Id
suite.NotNil(taskResp.AttachedFunction)
originalTaskID := taskResp.AttachedFunction.Id
suite.T().Logf("Created fully initialized task: %s", originalTaskID)

// STEP 2: Directly UPDATE database to make task partial (simulate Phase 3 failure)
Expand Down Expand Up @@ -363,7 +365,8 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
return
}
suite.NotNil(taskResp)
suite.T().Logf("Created task: %s", taskResp.Id)
suite.NotNil(taskResp.AttachedFunction)
suite.T().Logf("Created task: %s", taskResp.AttachedFunction.Id)

// STEP 2: Call CleanupExpiredPartialAttachedFunctions (with short timeout to test it doesn't affect complete tasks)
cleanupResp, err := suite.sysdbClient.CleanupExpiredPartialAttachedFunctions(ctx, &coordinatorpb.CleanupExpiredPartialAttachedFunctionsRequest{
Expand All @@ -381,12 +384,12 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
})
suite.NoError(err, "Task should still exist after cleanup")
suite.NotNil(getResp)
suite.Equal(taskResp.Id, getResp.AttachedFunction.Id)
suite.Equal(taskResp.AttachedFunction.Id, getResp.AttachedFunction.Id)
suite.T().Logf("Task still exists after cleanup: %s", getResp.AttachedFunction.Id)

// STEP 4: Delete the task
_, err = suite.sysdbClient.DetachFunction(ctx, &coordinatorpb.DetachFunctionRequest{
AttachedFunctionId: taskResp.Id,
AttachedFunctionId: taskResp.AttachedFunction.Id,
DeleteOutput: true,
})
suite.NoError(err, "Should delete task")
Expand All @@ -403,8 +406,9 @@ func (suite *HeapClientIntegrationTestSuite) TestPartialTaskCleanup_ThenRecreate
})
suite.NoError(err, "Should be able to recreate task after deletion")
suite.NotNil(taskResp2)
suite.NotEqual(taskResp.Id, taskResp2.Id, "New task should have different ID")
suite.T().Logf("Successfully recreated task: %s", taskResp2.Id)
suite.NotNil(taskResp2.AttachedFunction)
suite.NotEqual(taskResp.AttachedFunction.Id, taskResp2.AttachedFunction.Id, "New task should have different ID")
suite.T().Logf("Successfully recreated task: %s", taskResp2.AttachedFunction.Id)
}

func TestHeapClientIntegrationSuite(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ type FlushCollectionInfo struct {
AttachedFunctionCompletionOffset *int64
}

type ExtendedFlushCollectionInfo struct {
Collections []*FlushCollectionInfo
}

func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
if collectionID != types.NilUniqueID() && collectionID != collection.ID {
return false
Expand Down
70 changes: 70 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,76 @@ func (tc *Catalog) FlushCollectionCompactionAndAttachedFunction(
return flushCollectionInfo, nil
}

// FlushCollectionCompactionAndAttachedFunctionExtended atomically updates multiple collection compaction data
// and attached function completion offset in a single transaction.
// NOTE: This does NOT advance next_nonce - that is done separately by AdvanceAttachedFunction.
// This only updates the completion_offset to record how far we've processed.
// This is only supported for versioned collections (the modern/default path).
func (tc *Catalog) FlushCollectionCompactionAndAttachedFunctionExtended(
ctx context.Context,
collectionCompactions []*model.FlushCollectionCompaction,
attachedFunctionID uuid.UUID,
runNonce uuid.UUID,
completionOffset int64,
) (*model.ExtendedFlushCollectionInfo, error) {
if !tc.versionFileEnabled {
// Attached-function-based compactions are only supported with versioned collections
log.Error("FlushCollectionCompactionAndAttachedFunctionExtended is only supported for versioned collections")
return nil, errors.New("attached-function-based compaction requires versioned collections")
}

if len(collectionCompactions) == 0 {
return nil, errors.New("at least one collection compaction is required")
}

flushInfos := make([]*model.FlushCollectionInfo, 0, len(collectionCompactions))

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
var err error
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
tx := dbcore.GetDB(txCtx)

// Handle all collection compactions
for _, collectionCompaction := range collectionCompactions {
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended", zap.String("collection_id", collectionCompaction.ID.String()))
flushInfo, err := tc.FlushCollectionCompactionForVersionedCollection(txCtx, collectionCompaction, tx)
if err != nil {
return err
}
flushInfos = append(flushInfos, flushInfo)
}

// Update ONLY completion_offset - next_nonce was already advanced by AdvanceAttachedFunction
// We still validate runNonce to ensure we're updating the correct nonce
err = tc.metaDomain.AttachedFunctionDb(txCtx).UpdateCompletionOffset(attachedFunctionID, runNonce, completionOffset)
if err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

// Populate attached function fields with authoritative values from database
for _, flushInfo := range flushInfos {
flushInfo.AttachedFunctionCompletionOffset = &completionOffset
}

// Log with first collection ID (typically the output collection)
log.Info("FlushCollectionCompactionAndAttachedFunctionExtended",
zap.String("first_collection_id", collectionCompactions[0].ID.String()),
zap.Int("collection_count", len(collectionCompactions)),
zap.String("attached_function_id", attachedFunctionID.String()),
zap.Int64("completion_offset", completionOffset))

return &model.ExtendedFlushCollectionInfo{
Collections: flushInfos,
}, nil
}

func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))
Expand Down
Loading
Loading