Skip to content

Commit 7668944

Browse files
authored
Merge pull request #5655 from apache/auto-delete-tseq
purge deleted documents that exceed TTL
2 parents 888239a + 12b9487 commit 7668944

File tree

9 files changed

+469
-2
lines changed

9 files changed

+469
-2
lines changed

rel/overlay/etc/default.ini

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,15 @@ url = {{nouveau_url}}
11861186
; Scanner settings to skip dbs and docs would also work:
11871187
;[couch_quickjs_scanner_plugin.skip_{dbs,ddoc,docs}]
11881188

1189+
[couch_auto_purge_plugin]
1190+
; The most id/rev pairs the plugin will attempt to purge in
1191+
; one request.
1192+
;max_batch_size = 500
1193+
; The default time-to-live, measured in seconds, before a
1194+
; deleted document is eligible to be purged by the plugin.
1195+
; Defaults to undefined, which disables auto purging.
1196+
;deleted_document_ttl =
1197+
11891198
[chttpd_auth_lockout]
11901199
; CouchDB can temporarily lock out IP addresses that repeatedly fail authentication
11911200
; mode can be set to one of three recognised values;

src/chttpd/src/chttpd_db.erl

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
handle_view_cleanup_req/2,
3131
update_doc/4,
3232
http_code_from_status/1,
33-
handle_partition_req/2
33+
handle_partition_req/2,
34+
handle_auto_purge_req/2
3435
]).
3536

3637
-import(
@@ -390,6 +391,36 @@ update_partition_stats(PathParts) ->
390391
ok
391392
end.
392393

394+
handle_auto_purge_req(#httpd{method = 'GET'} = Req, Db) ->
395+
case fabric:get_auto_purge_props(Db) of
396+
{ok, AutoPurgeProps} ->
397+
send_json(Req, {AutoPurgeProps});
398+
{error, Reason} ->
399+
chttpd:send_error(Req, Reason)
400+
end;
401+
handle_auto_purge_req(#httpd{method = 'PUT'} = Req, Db) ->
402+
{AutoPurgeProps} = chttpd:json_body_obj(Req),
403+
validate_auto_purge_props(AutoPurgeProps),
404+
case fabric:set_auto_purge_props(Db, AutoPurgeProps) of
405+
ok ->
406+
send_json(Req, 201, {[{ok, true}]});
407+
{error, Reason} ->
408+
chttpd:send_error(Req, Reason)
409+
end;
410+
handle_auto_purge_req(#httpd{} = Req, _Db) ->
411+
send_method_not_allowed(Req, "GET,PUT,HEAD").
412+
413+
validate_auto_purge_props([]) ->
414+
ok;
415+
validate_auto_purge_props([{<<"deleted_document_ttl">>, Value} | Rest]) when is_integer(Value) ->
416+
validate_auto_purge_props(Rest);
417+
validate_auto_purge_props([{<<"deleted_document_ttl">>, _Value} | _Rest]) ->
418+
throw({bad_request, <<"deleted_document_ttl must be an integer">>});
419+
validate_auto_purge_props([{_K, _V} | _Rest]) ->
420+
throw({bad_request, <<"invalid auto purge property">>});
421+
validate_auto_purge_props(_Else) ->
422+
throw({bad_request, <<"malformed auto purge body">>}).
423+
393424
handle_design_req(
394425
#httpd{
395426
path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | _Rest]

src/chttpd/src/chttpd_httpd_handlers.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ db_handler(<<"_design">>) -> fun chttpd_db:handle_design_req/2;
3535
db_handler(<<"_partition">>) -> fun chttpd_db:handle_partition_req/2;
3636
db_handler(<<"_temp_view">>) -> fun chttpd_view:handle_temp_view_req/2;
3737
db_handler(<<"_changes">>) -> fun chttpd_db:handle_changes_req/2;
38+
db_handler(<<"_auto_purge">>) -> fun chttpd_db:handle_auto_purge_req/2;
3839
db_handler(_) -> no_match.
3940

4041
design_handler(<<"_view">>) -> fun chttpd_view:handle_view_req/3;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(couch_auto_purge_plugin).
14+
-behaviour(couch_scanner_plugin).
15+
16+
-export([
17+
start/2,
18+
resume/2,
19+
complete/1,
20+
checkpoint/1,
21+
db/2,
22+
db_opened/2,
23+
db_closing/2,
24+
doc_fdi/3
25+
]).
26+
27+
-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
28+
-include_lib("stdlib/include/assert.hrl").
29+
30+
start(ScanId, #{}) ->
31+
St = init_config(ScanId),
32+
?INFO("Starting.", [], St),
33+
{ok, St}.
34+
35+
resume(ScanId, #{}) ->
36+
St = init_config(ScanId),
37+
?INFO("Resuming.", [], St),
38+
{ok, St}.
39+
40+
complete(St) ->
41+
?INFO("Completed", [], St),
42+
{ok, #{}}.
43+
44+
checkpoint(_St) ->
45+
{ok, #{}}.
46+
47+
db(St, DbName) ->
48+
case ttl(St, DbName) of
49+
TTL when is_integer(TTL) ->
50+
{ok, St#{ttl => TTL}};
51+
undefined ->
52+
{skip, St}
53+
end.
54+
55+
db_opened(#{} = St, Db) ->
56+
#{ttl := TTL} = St,
57+
EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db), couch_time_seq:timestamp() - TTL),
58+
ChangeOpts =
59+
if
60+
EndSeq == now -> [];
61+
true -> [{end_key, EndSeq}]
62+
end,
63+
?INFO("scanning for deleted documents in ~s up to ~p", [couch_db:name(Db), EndSeq], meta(St)),
64+
{0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
65+
66+
db_closing(#{} = St, Db) ->
67+
#{count := Count} = St,
68+
?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)),
69+
{ok, St}.
70+
71+
doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
72+
#{end_seq := EndSeq} = St,
73+
?assert(
74+
FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq"
75+
),
76+
{ok, purge(St, FDI, Db)};
77+
doc_fdi(#{} = St, #full_doc_info{}, _Db) ->
78+
{ok, St}.
79+
80+
purge(#{} = St, #full_doc_info{} = FDI, Db) ->
81+
{Id, Revs} = fdi_to_idrevs(FDI),
82+
MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500),
83+
purge(St, Id, Revs, MaxBatchSize, Db).
84+
85+
purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize ->
86+
DbName = mem3:dbname(couch_db:name(Db)),
87+
PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end,
88+
Timeout = fabric_util:request_timeout(),
89+
try fabric_util:isolate(PurgeFun, Timeout) of
90+
{Health, Results} when Health == ok; Health == accepted ->
91+
#{count := Count, limiter := Limiter0} = St,
92+
{Wait, Limiter1} = couch_scanner_rate_limiter:update(
93+
Limiter0, doc_write, length(Results)
94+
),
95+
timer:sleep(Wait),
96+
St#{count => Count + length(Results), limiter => Limiter1};
97+
Else ->
98+
?WARN(
99+
"Failed to purge deleted documents in ~s/~s for reason ~p",
100+
[DbName, Id, Else],
101+
meta(St)
102+
),
103+
St
104+
catch
105+
Class:Reason ->
106+
?WARN(
107+
"Failed to purge deleted documents in ~s/~s for reason ~p:~p",
108+
[DbName, Id, Class, Reason],
109+
meta(St)
110+
),
111+
St
112+
end;
113+
purge(#{} = St0, Id, Revs, MaxBatchSize, Db) ->
114+
{RevBatch, RevRest} = lists:split(MaxBatchSize, Revs),
115+
St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db),
116+
purge(St1, Id, RevRest, MaxBatchSize, Db).
117+
118+
fdi_to_idrevs(#full_doc_info{} = FDI) ->
119+
Revs = [
120+
couch_doc:rev_to_str({Pos, RevId})
121+
|| {#leaf{}, {Pos, [RevId | _]}} <- couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree)
122+
],
123+
{FDI#full_doc_info.id, Revs}.
124+
125+
init_config(ScanId) ->
126+
#{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}.
127+
128+
meta(#{sid := ScanId}) ->
129+
#{sid => ScanId}.
130+
131+
ttl(St, DbName) ->
132+
DefaultTTL = config:get(atom_to_list(?MODULE), "deleted_document_ttl"),
133+
DbTTL =
134+
case fabric:get_auto_purge_props(DbName) of
135+
{ok, AutoPurgeProps} ->
136+
case couch_util:get_value(<<"deleted_document_ttl">>, AutoPurgeProps) of
137+
TTL when is_integer(TTL) ->
138+
TTL;
139+
undefined ->
140+
undefined;
141+
Else ->
142+
?WARN(
143+
"TTL in ~s as ttl was '~p', not integer",
144+
[DbName, Else],
145+
meta(St)
146+
),
147+
undefined
148+
end;
149+
{error, Reason} ->
150+
?WARN(
151+
"Failed to fetch ttl in ~s for reason ~p",
152+
[DbName, Reason],
153+
meta(St)
154+
),
155+
undefined
156+
end,
157+
if
158+
DbTTL /= undefined -> DbTTL;
159+
DefaultTTL /= undefined -> list_to_integer(DefaultTTL);
160+
true -> undefined
161+
end.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(couch_auto_purge_plugin_tests).
14+
15+
-include_lib("couch/include/couch_eunit.hrl").
16+
-include_lib("couch/include/couch_db.hrl").
17+
18+
-define(PLUGIN, couch_auto_purge_plugin).
19+
20+
couch_quickjs_scanner_plugin_test_() ->
21+
{
22+
foreach,
23+
fun setup/0,
24+
fun teardown/1,
25+
[
26+
?TDEF_FE(t_no_auto_purge_by_default, 10),
27+
?TDEF_FE(t_auto_purge_after_config_ttl, 10),
28+
?TDEF_FE(t_auto_purge_after_db_ttl, 10)
29+
]
30+
}.
31+
32+
setup() ->
33+
{module, _} = code:ensure_loaded(?PLUGIN),
34+
meck:new(?PLUGIN, [passthrough]),
35+
meck:new(couch_scanner_server, [passthrough]),
36+
meck:new(couch_scanner_util, [passthrough]),
37+
Ctx = test_util:start_couch([fabric, couch_scanner]),
38+
DbName = ?tempdb(),
39+
ok = fabric:create_db(DbName, [{q, "2"}, {n, "1"}]),
40+
config:set(atom_to_list(?PLUGIN), "max_batch_items", "1", false),
41+
reset_stats(),
42+
{Ctx, DbName}.
43+
44+
teardown({Ctx, DbName}) ->
45+
config_delete_section("couch_scanner"),
46+
config_delete_section("couch_scanner_plugins"),
47+
config_delete_section(atom_to_list(?PLUGIN)),
48+
couch_scanner:reset_checkpoints(),
49+
couch_scanner:resume(),
50+
fabric:delete_db(DbName),
51+
test_util:stop_couch(Ctx),
52+
meck:unload().
53+
54+
t_no_auto_purge_by_default({_, DbName}) ->
55+
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
56+
?assertEqual(1, doc_del_count(DbName)),
57+
meck:reset(couch_scanner_server),
58+
meck:reset(?PLUGIN),
59+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
60+
wait_exit(10000),
61+
?assertEqual(1, doc_del_count(DbName)),
62+
ok.
63+
64+
t_auto_purge_after_config_ttl({_, DbName}) ->
65+
config:set(atom_to_list(?PLUGIN), "deleted_document_ttl", "-1000000", false),
66+
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
67+
?assertEqual(1, doc_del_count(DbName)),
68+
meck:reset(couch_scanner_server),
69+
meck:reset(?PLUGIN),
70+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
71+
wait_exit(10000),
72+
?assertEqual(0, doc_del_count(DbName)),
73+
ok.
74+
75+
t_auto_purge_after_db_ttl({_, DbName}) ->
76+
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, -1000000}]),
77+
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
78+
?assertEqual(1, doc_del_count(DbName)),
79+
meck:reset(couch_scanner_server),
80+
meck:reset(?PLUGIN),
81+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
82+
wait_exit(10000),
83+
?assertEqual(0, doc_del_count(DbName)),
84+
ok.
85+
86+
reset_stats() ->
87+
Counters = [
88+
[couchdb, query_server, process_error_exits],
89+
[couchdb, query_server, process_errors],
90+
[couchdb, query_server, process_exits]
91+
],
92+
[reset_counter(C) || C <- Counters].
93+
94+
reset_counter(Counter) ->
95+
case couch_stats:sample(Counter) of
96+
0 ->
97+
ok;
98+
N when is_integer(N), N > 0 ->
99+
couch_stats:decrement_counter(Counter, N)
100+
end.
101+
102+
config_delete_section(Section) ->
103+
[config:delete(K, V, false) || {K, V} <- config:get(Section)].
104+
105+
add_doc(DbName, DocId, Body) ->
106+
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]),
107+
ok.
108+
109+
mkdoc(Id, #{} = Body) ->
110+
Body1 = Body#{<<"_id">> => Id},
111+
jiffy:decode(jiffy:encode(Body1)).
112+
113+
wait_exit(MSec) ->
114+
meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], MSec).
115+
116+
doc_del_count(DbName) ->
117+
{ok, DbInfo} = fabric:get_db_info(DbName),
118+
couch_util:get_value(doc_del_count, DbInfo).

0 commit comments

Comments
 (0)