@@ -1400,6 +1400,18 @@ static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionFor
14001400 return {std::make_shared<ExpressionActions>(std::move (actions)), sign_filter->getColumnName ()};
14011401}
14021402
1403+ static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionForIsDeleted (const String & is_deleted_column_name, const Block & header, const ContextPtr & context)
1404+ {
1405+ ASTPtr is_deleted_identifier = std::make_shared<ASTIdentifier>(is_deleted_column_name);
1406+ ASTPtr is_deleted_filter = makeASTFunction (" equals" , is_deleted_identifier, std::make_shared<ASTLiteral>(Field (static_cast <Int8>(0 ))));
1407+
1408+ const auto & is_deleted_column = header.getByName (is_deleted_column_name);
1409+
1410+ auto syntax_result = TreeRewriter (context).analyze (is_deleted_filter, {{is_deleted_column.name , is_deleted_column.type }});
1411+ auto actions = ExpressionAnalyzer (is_deleted_filter, syntax_result, context).getActionsDAG (false );
1412+ return {std::make_shared<ExpressionActions>(std::move (actions)), is_deleted_filter->getColumnName ()};
1413+ }
1414+
14031415bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal () const
14041416{
14051417 const auto & settings = context->getSettingsRef ();
@@ -1493,7 +1505,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
14931505 bool no_merging_final = do_not_merge_across_partitions_select_final &&
14941506 std::distance (parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1 ]) == 1 &&
14951507 parts_to_merge_ranges[range_index]->data_part ->info .level > 0 &&
1496- data. merging_params . is_deleted_column . empty () && !reader_settings.read_in_order ;
1508+ !reader_settings.read_in_order ;
14971509
14981510 if (no_merging_final)
14991511 {
@@ -1532,11 +1544,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
15321544 info.use_uncompressed_cache );
15331545 };
15341546
1535- // / Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
1536- // / so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
1547+ // / Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column.
1548+ // / Non-intersecting ranges will just go through extra filter added by createExpressionForIsDeleted() to filter
1549+ // / deleted rows.
15371550 bool split_parts_ranges_into_intersecting_and_non_intersecting_final
1538- = settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final]
1539- && data. merging_params . is_deleted_column . empty () && !reader_settings.read_in_order ;
1551+ = settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] &&
1552+ !reader_settings.read_in_order ;
15401553
15411554 SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey (
15421555 storage_snapshot->metadata ->getPrimaryKey (),
@@ -1621,6 +1634,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
16211634 return std::make_shared<FilterTransform>(header, expression, filter_name, true );
16221635 });
16231636 }
1637+ else if (!data.merging_params .is_deleted_column .empty ())
1638+ {
1639+ auto columns_with_is_deleted = origin_column_names;
1640+ if (std::ranges::find (columns_with_is_deleted, data.merging_params .is_deleted_column ) == columns_with_is_deleted.end ())
1641+ columns_with_is_deleted.push_back (data.merging_params .is_deleted_column );
1642+
1643+ pipe = spreadMarkRangesAmongStreams (
1644+ std::move (non_intersecting_parts_by_primary_key), index_build_context, num_streams, columns_with_is_deleted);
1645+ auto [expression, filter_name] = createExpressionForIsDeleted (data.merging_params .is_deleted_column , pipe.getHeader (), context);
1646+
1647+ pipe.addSimpleTransform ([&](const SharedHeader & header)
1648+ {
1649+ return std::make_shared<FilterTransform>(header, expression, filter_name, true );
1650+ });
1651+ }
16241652 else
16251653 {
16261654 pipe = spreadMarkRangesAmongStreams (std::move (non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
0 commit comments