Skip to content

Commit 12b6c54

Browse files
shankar-iyerzvonand
authored andcommitted
Merge pull request ClickHouse#88090 from shankar-iyer/fix_is_deleted_with_filter
Optimize ReplacingMergeTree is_deleted FINAL queries by adding a filter expression transform
1 parent 1a76b25 commit 12b6c54

File tree

3 files changed

+112
-5
lines changed

3 files changed

+112
-5
lines changed

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,18 @@ static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionFor
14001400
return {std::make_shared<ExpressionActions>(std::move(actions)), sign_filter->getColumnName()};
14011401
}
14021402

1403+
static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionForIsDeleted(const String & is_deleted_column_name, const Block & header, const ContextPtr & context)
1404+
{
1405+
ASTPtr is_deleted_identifier = std::make_shared<ASTIdentifier>(is_deleted_column_name);
1406+
ASTPtr is_deleted_filter = makeASTFunction("equals", is_deleted_identifier, std::make_shared<ASTLiteral>(Field(static_cast<Int8>(0))));
1407+
1408+
const auto & is_deleted_column = header.getByName(is_deleted_column_name);
1409+
1410+
auto syntax_result = TreeRewriter(context).analyze(is_deleted_filter, {{is_deleted_column.name, is_deleted_column.type}});
1411+
auto actions = ExpressionAnalyzer(is_deleted_filter, syntax_result, context).getActionsDAG(false);
1412+
return {std::make_shared<ExpressionActions>(std::move(actions)), is_deleted_filter->getColumnName()};
1413+
}
1414+
14031415
bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
14041416
{
14051417
const auto & settings = context->getSettingsRef();
@@ -1493,7 +1505,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
14931505
bool no_merging_final = do_not_merge_across_partitions_select_final &&
14941506
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
14951507
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
1496-
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
1508+
!reader_settings.read_in_order;
14971509

14981510
if (no_merging_final)
14991511
{
@@ -1532,11 +1544,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
15321544
info.use_uncompressed_cache);
15331545
};
15341546

1535-
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
1536-
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
1547+
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column.
1548+
/// Non-intersecting ranges will just go through extra filter added by createExpressionForIsDeleted() to filter
1549+
/// deleted rows.
15371550
bool split_parts_ranges_into_intersecting_and_non_intersecting_final
1538-
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final]
1539-
&& data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
1551+
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] &&
1552+
!reader_settings.read_in_order;
15401553

15411554
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
15421555
storage_snapshot->metadata->getPrimaryKey(),
@@ -1621,6 +1634,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
16211634
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
16221635
});
16231636
}
1637+
else if (!data.merging_params.is_deleted_column.empty())
1638+
{
1639+
auto columns_with_is_deleted = origin_column_names;
1640+
if (std::ranges::find(columns_with_is_deleted, data.merging_params.is_deleted_column) == columns_with_is_deleted.end())
1641+
columns_with_is_deleted.push_back(data.merging_params.is_deleted_column);
1642+
1643+
pipe = spreadMarkRangesAmongStreams(
1644+
std::move(non_intersecting_parts_by_primary_key), index_build_context, num_streams, columns_with_is_deleted);
1645+
auto [expression, filter_name] = createExpressionForIsDeleted(data.merging_params.is_deleted_column, pipe.getHeader(), context);
1646+
1647+
pipe.addSimpleTransform([&](const SharedHeader & header)
1648+
{
1649+
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
1650+
});
1651+
}
16241652
else
16251653
{
16261654
pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
10000
2+
9950
3+
0
4+
10000
5+
17700
6+
17700
7+
17700
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Test for FINAL query on ReplacingMergeTree + is_deleted makes use of optimizations.
2+
3+
DROP TABLE IF EXISTS tab;
4+
5+
CREATE TABLE tab (
6+
pkey String,
7+
id Int32,
8+
v Int32,
9+
version UInt64,
10+
is_deleted UInt8
11+
) Engine = ReplacingMergeTree(version,is_deleted)
12+
PARTITION BY pkey ORDER BY id
13+
SETTINGS index_granularity=512;
14+
15+
-- insert 10000 rows in partition 'A' and delete half of them and merge the 2 parts
16+
INSERT INTO tab SELECT 'A', number, number, 1, 0 FROM numbers(10000);
17+
INSERT INTO tab SELECT 'A', number, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);
18+
19+
OPTIMIZE TABLE tab SETTINGS mutations_sync = 2;
20+
21+
SYSTEM STOP MERGES tab;
22+
23+
-- insert 10000 rows in partition 'B' and delete half of them, but keep 2 parts
24+
INSERT INTO tab SELECT 'B', number+1000000, number, 1, 0 FROM numbers(10000);
25+
INSERT INTO tab SELECT 'B', number+1000000, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);
26+
27+
SET do_not_merge_across_partitions_select_final=1;
28+
29+
-- verify : 10000 rows expected
30+
SELECT count()
31+
FROM tab FINAL;
32+
33+
-- add a filter : 9950 rows expected
34+
SELECT count()
35+
FROM tab FINAL
36+
WHERE id >= 100;
37+
38+
-- only even id's are left - 0 rows expected
39+
SELECT count()
40+
FROM tab FINAL
41+
WHERE (id % 2) = 1;
42+
43+
-- 10000 rows expected
44+
SELECT count()
45+
FROM tab FINAL
46+
WHERE (id % 2) = 0;
47+
48+
-- create some more partitions
49+
INSERT INTO tab SELECT 'C', number+2000000, number, 1, 0 FROM numbers(100);
50+
51+
-- insert and delete some rows to get intersecting/non-intersecting ranges in same partition
52+
INSERT INTO tab SELECT 'D', number+3000000, number, 1, 0 FROM numbers(10000);
53+
INSERT INTO tab SELECT 'D', number+3000000, number + 1, 1, IF(number % 2 = 0, 0, 1) FROM numbers(5000);
54+
55+
INSERT INTO tab SELECT 'E', number+4000000, number, 1, 0 FROM numbers(100);
56+
57+
-- Total 10000 (From A & B) + 100 (From C) + 7500 (From D) + 100 (From E) = 17700 rows
58+
SELECT count()
59+
FROM tab FINAL
60+
SETTINGS do_not_merge_across_partitions_select_final=0,split_intersecting_parts_ranges_into_layers_final=0;
61+
62+
SELECT count()
63+
FROM tab FINAL
64+
SETTINGS do_not_merge_across_partitions_select_final=1,split_intersecting_parts_ranges_into_layers_final=1;
65+
66+
SYSTEM START MERGES tab;
67+
OPTIMIZE TABLE tab FINAL SETTINGS mutations_sync = 2;
68+
69+
SELECT count()
70+
FROM tab FINAL;
71+
72+
DROP TABLE IF EXISTS tab;

0 commit comments

Comments
 (0)