-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathpgactive--2.1.0.sql
More file actions
2105 lines (1819 loc) · 79.8 KB
/
pgactive--2.1.0.sql
File metadata and controls
2105 lines (1819 loc) · 79.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pgactive" to load this file. \quit
--- We must be able to use exclusion constraints for global sequences among
--- other things.
-- SET pgactive.permit_unsafe_ddl_commands = true; is removed for now
-- We don't want to replicate commands from in here
SET pgactive.skip_ddl_replication = true;
CREATE SCHEMA pgactive;
GRANT USAGE ON SCHEMA pgactive TO public;
-- Everything should assume the 'pgactive' prefix
SET LOCAL search_path = pgactive;
CREATE FUNCTION pgactive_version()
RETURNS TEXT
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_variant()
RETURNS TEXT
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_get_stats (
OUT rep_node_id oid,
OUT rilocalid oid,
OUT riremoteid text,
OUT nr_commit int8,
OUT nr_rollback int8,
OUT nr_insert int8,
OUT nr_insert_conflict int8,
OUT nr_update int8,
OUT nr_update_conflict int8,
OUT nr_delete int8,
OUT nr_delete_conflict int8,
OUT nr_disconnect int8
)
RETURNS SETOF record
AS 'MODULE_PATHNAME'
LANGUAGE C;
REVOKE ALL ON FUNCTION pgactive_get_stats() FROM PUBLIC;
CREATE VIEW pgactive_stats AS SELECT * FROM pgactive_get_stats();
CREATE TYPE pgactive_conflict_type AS ENUM (
'insert_insert',
'insert_update',
'update_update',
'update_delete',
'delete_delete',
'unhandled_tx_abort'
);
COMMENT ON TYPE pgactive_conflict_type IS
'The nature of a pgactive apply conflict - concurrent updates (update_update), conflicting inserts, etc.';
CREATE TYPE pgactive_conflict_handler_action AS ENUM('IGNORE', 'ROW', 'SKIP');
CREATE TABLE pgactive_conflict_handlers (
ch_name NAME NOT NULL,
ch_type pgactive.pgactive_conflict_type NOT NULL,
ch_reloid oid NOT NULL,
ch_fun text NOT NULL,
ch_timeframe INTERVAL,
PRIMARY KEY(ch_reloid, ch_name)
);
REVOKE ALL ON TABLE pgactive_conflict_handlers FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('pgactive_conflict_handlers', '');
CREATE INDEX pgactive_conflict_handlers_ch_type_reloid_idx
ON pgactive_conflict_handlers(ch_reloid, ch_type);
CREATE FUNCTION pgactive_create_conflict_handler (
ch_rel REGCLASS,
ch_name NAME,
ch_proc REGPROCEDURE,
ch_type pgactive.pgactive_conflict_type,
ch_timeframe INTERVAL DEFAULT NULL
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_drop_conflict_handler(ch_rel REGCLASS, ch_name NAME)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE TYPE pgactive_conflict_resolution AS ENUM (
'conflict_trigger_skip_change',
'conflict_trigger_returned_tuple',
'last_update_wins_keep_local',
'last_update_wins_keep_remote',
'apply_change',
'skip_change',
'unhandled_tx_abort'
);
COMMENT ON TYPE pgactive_conflict_resolution IS
'Resolution of a pgactive conflict - if a conflict was resolved by a conflict trigger, by last-update-wins tests on commit timestamps, etc.';
CREATE SEQUENCE pgactive_conflict_history_id_seq;
--
-- pgactive_conflict_history records apply conflicts so they can be queried and
-- analysed by administrators.
--
-- This must remain in sync with struct pgactiveApplyConflict and
-- pgactive_conflict_log_table().
--
CREATE TABLE pgactive_conflict_history (
conflict_id bigint not null default nextval('pgactive_conflict_history_id_seq'),
local_node_sysid text not null, -- really uint64 but we don't have the type for it
PRIMARY KEY (local_node_sysid, conflict_id),
local_conflict_xid xid not null, -- xid of conflicting apply tx
-- lsn of local node at the time the conflict was detected
local_conflict_lsn pg_lsn not null,
local_conflict_time timestamptz not null,
object_schema text,
object_name text,
remote_node_sysid text not null, -- again, really uint64
remote_txid xid not null,
remote_commit_time timestamptz not null,
remote_commit_lsn pg_lsn not null,
conflict_type pgactive_conflict_type not null,
conflict_resolution pgactive_conflict_resolution not null,
local_tuple json,
remote_tuple json,
local_tuple_xmin xid,
local_tuple_origin_sysid text, -- also really uint64
-- The following apply only for unhandled apply errors and correspond to
-- fields in ErrorData in elog.h .
error_message text,
error_sqlstate text CHECK (length(error_sqlstate) = 5),
error_querystring text,
error_cursorpos integer,
error_detail text,
error_hint text,
error_context text,
-- schema and table in object_schema, object_name above
error_columnname text,
error_typename text,
error_constraintname text,
error_filename text,
error_lineno integer,
error_funcname text,
remote_node_timeline oid,
remote_node_dboid oid,
local_tuple_origin_timeline oid,
local_tuple_origin_dboid oid,
local_commit_time timestamptz
);
REVOKE ALL ON TABLE pgactive_conflict_history FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('pgactive_conflict_history', 'WHERE false');
ALTER SEQUENCE pgactive_conflict_history_id_seq
OWNED BY pgactive_conflict_history.conflict_id;
COMMENT ON TABLE pgactive_conflict_history IS 'Log of all conflicts in this pgactive group';
COMMENT ON COLUMN pgactive_conflict_history.local_node_sysid IS 'sysid of the local node where the apply conflict occurred';
COMMENT ON COLUMN pgactive_conflict_history.remote_node_sysid IS 'sysid of the remote node the conflicting transaction originated from';
COMMENT ON COLUMN pgactive_conflict_history.object_schema IS 'Schema of the object involved in the conflict';
COMMENT ON COLUMN pgactive_conflict_history.object_name IS 'Name of the object (table, etc.) involved in the conflict';
COMMENT ON COLUMN pgactive_conflict_history.local_conflict_xid IS 'Transaction ID of the apply transaction that encountered the conflict';
COMMENT ON COLUMN pgactive_conflict_history.local_conflict_lsn IS 'xlog position at the time the conflict occured on the applying node';
COMMENT ON COLUMN pgactive_conflict_history.local_conflict_time IS 'The time the conflict was detected on the applying node';
COMMENT ON COLUMN pgactive_conflict_history.remote_txid IS 'xid of the remote transaction involved in the conflict';
COMMENT ON COLUMN pgactive_conflict_history.remote_commit_time IS 'The time the remote transaction involved in this conflict committed';
COMMENT ON COLUMN pgactive_conflict_history.remote_commit_lsn IS 'LSN on remote node at which conflicting transaction committed';
COMMENT ON COLUMN pgactive_conflict_history.conflict_type IS 'Nature of the conflict - insert/insert, update/delete, etc.';
COMMENT ON COLUMN pgactive_conflict_history.local_tuple IS 'For DML conflicts, the conflicting tuple from the local DB (as json), if logged';
COMMENT ON COLUMN pgactive_conflict_history.local_tuple_xmin IS 'If local_tuple is set, the xmin of the conflicting local tuple';
COMMENT ON COLUMN pgactive_conflict_history.local_tuple_origin_sysid IS 'The node id for the true origin of the local tuple. Differs from local_node_sysid if the tuple was originally replicated from another node';
COMMENT ON COLUMN pgactive_conflict_history.remote_tuple IS 'For DML conflicts, the conflicting tuple from the remote DB (as json), if logged';
COMMENT ON COLUMN pgactive_conflict_history.conflict_resolution IS 'How the conflict was resolved/handled; see the enum definition';
COMMENT ON COLUMN pgactive_conflict_history.error_message IS 'On apply error, the error message from ereport/elog. Other error fields match.';
COMMENT ON COLUMN pgactive_conflict_history.local_commit_time IS 'The time the local transaction involved in this conflict committed';
-- The pgactive_nodes table tracks members of a pgactive group; it's only concerned with
-- one pgactive group so it only has to track enough to uniquely identify each
-- member node, which is the (sysid, timeline, dboid) tuple for that node.
--
-- The sysid must be a numeric (or string) because PostgreSQL has no uint64 SQL
-- type.
--
-- We don't exclude pgactive_nodes with pg_extension_config_dump because this is a
-- global table that's sync'd between nodes.
--
CREATE TABLE pgactive_nodes (
node_sysid text not null, -- Really a uint64 but we have no type for that
node_timeline oid not null,
node_dboid oid not null, -- This is an oid local to the node_sysid cluster
node_status "char" not null,
node_name text not null,
node_dsn text,
node_init_from_dsn text,
node_read_only boolean default false,
node_seq_id smallint,
primary key(node_sysid, node_timeline, node_dboid),
CHECK (node_status in ('b', 'i', 'c', 'o', 'r', 'k'))
);
REVOKE ALL ON TABLE pgactive_nodes FROM PUBLIC;
-- pgactive.pgactive_nodes gets synced by pgactive_sync_nodes(), it shouldn't be dumped and
-- applied.
SELECT pg_catalog.pg_extension_config_dump('pgactive_nodes', 'WHERE false');
-- Add constrains ensuring node_names are unique and not null
CREATE UNIQUE INDEX pgactive_nodes_node_name ON pgactive_nodes(node_name);
COMMENT ON TABLE pgactive_nodes IS 'All known nodes in this pgactive group';
COMMENT ON COLUMN pgactive_nodes.node_sysid IS 'pgactive generated node identifier';
COMMENT ON COLUMN pgactive_nodes.node_timeline IS 'Timeline ID of this node';
COMMENT ON COLUMN pgactive_nodes.node_dboid IS 'Local database oid on the cluster (node_sysid, node_timeline)';
COMMENT ON COLUMN pgactive_nodes.node_status IS 'Readiness of the node: [b]eginning setup, [i]nitializing, [c]atchup, creating [o]utbound slots, [r]eady, [k]illed. Doesn''t indicate connected/disconnected.';
CREATE TABLE pgactive_global_locks (
locktype text NOT NULL,
owning_sysid text NOT NULL,
owning_timeline oid NOT NULL,
owning_datid oid NOT NULL,
owner_created_lock_at pg_lsn NOT NULL,
acquired_sysid text NOT NULL,
acquired_timeline oid NOT NULL,
acquired_datid oid NOT NULL,
acquired_lock_at pg_lsn,
state text NOT NULL
);
REVOKE ALL ON TABLE pgactive_global_locks FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('pgactive_global_locks', '');
CREATE UNIQUE INDEX pgactive_global_locks_byowner
ON pgactive_global_locks(locktype, owning_sysid, owning_timeline, owning_datid);
CREATE TABLE pgactive_queued_commands (
lsn pg_lsn NOT NULL,
queued_at TIMESTAMP WITH TIME ZONE NOT NULL,
perpetrator TEXT NOT NULL,
command_tag TEXT NOT NULL,
command TEXT NOT NULL,
search_path TEXT DEFAULT ''
);
REVOKE ALL ON TABLE pgactive_queued_commands FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('pgactive_queued_commands', '');
CREATE FUNCTION pgactive_replicate_ddl_command(cmd TEXT)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_truncate_trigger_add()
RETURNS event_trigger
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_internal_create_truncate_trigger(regclass)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_queue_truncate()
RETURNS TRIGGER
AS 'MODULE_PATHNAME', 'pgactive_queue_truncate'
LANGUAGE C;
-- This type is tailored to use as input to get_object_address
CREATE TYPE dropped_object AS (
objtype text,
objnames text[],
objargs text[]
);
CREATE TABLE pgactive_queued_drops (
lsn pg_lsn NOT NULL,
queued_at timestamptz NOT NULL,
dropped_objects pgactive.dropped_object[] NOT NULL
);
REVOKE ALL ON TABLE pgactive_queued_drops FROM PUBLIC;
SELECT pg_catalog.pg_extension_config_dump('pgactive_queued_drops', '');
CREATE FUNCTION pgactive_apply_pause()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_apply_resume()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C;
--- Functions for manipulating/displaying replications sets
CREATE FUNCTION pgactive_get_table_replication_sets(relation regclass, OUT sets text[])
VOLATILE
STRICT
LANGUAGE 'sql'
AS $$
SELECT
ARRAY(
SELECT *
FROM json_array_elements_text(COALESCE((
SELECT label::json->'sets'
FROM pg_seclabel
WHERE provider = 'pgactive'
AND classoid = 'pg_class'::regclass
AND objoid = $1::regclass
), '["default"]'))
)|| '{all}';
$$;
CREATE TABLE pgactive_replication_set_config (
set_name name PRIMARY KEY,
replicate_inserts bool NOT NULL DEFAULT true,
replicate_updates bool NOT NULL DEFAULT true,
replicate_deletes bool NOT NULL DEFAULT true
);
ALTER TABLE pgactive_replication_set_config SET (user_catalog_table = true);
REVOKE ALL ON TABLE pgactive_replication_set_config FROM PUBLIC;
-- Fix quoting for format() arguments by directly using regclass with %s
-- instead of %I
CREATE FUNCTION pgactive_set_table_replication_sets(p_relation regclass, p_sets text[])
RETURNS void
VOLATILE
LANGUAGE 'plpgsql'
-- remove pgactive_permit_unsafe_commands and do not replace
-- by pgactive_skip_ddl_replication for now
SET search_path = ''
AS $$
DECLARE
v_label json;
setting_value text;
BEGIN
-- emulate STRICT for p_relation parameter
IF p_relation IS NULL THEN
RETURN;
END IF;
-- query current label
SELECT label::json INTO v_label
FROM pg_catalog.pg_seclabel
WHERE provider = 'pgactive'
AND classoid = 'pg_class'::regclass
AND objoid = p_relation;
-- replace old 'sets' parameter with new value
SELECT json_object_agg(key, value) INTO v_label
FROM (
SELECT key, value
FROM json_each(v_label)
WHERE key <> 'sets'
UNION ALL
SELECT
'sets', to_json(p_sets)
WHERE p_sets IS NOT NULL
) d;
-- and now set the appropriate label
-- pgactive_replicate_ddl_command would fail if skip_ddl_replication is true
SELECT setting INTO setting_value
FROM pg_settings
WHERE name = 'pgactive.skip_ddl_replication';
IF setting_value = 'on' or setting_value = 'true' THEN
PERFORM format('SECURITY LABEL FOR pgactive ON TABLE %s IS %L', p_relation, v_label);
ELSE
PERFORM pgactive.pgactive_replicate_ddl_command(format('SECURITY LABEL FOR pgactive ON TABLE %s IS %L', p_relation, v_label));
END IF;
END;
$$;
CREATE FUNCTION pgactive_get_local_nodeid (
sysid OUT text,
timeline OUT oid,
dboid OUT oid)
RETURNS record
AS 'MODULE_PATHNAME'
LANGUAGE C;
CREATE FUNCTION pgactive_version_num()
RETURNS integer
AS 'MODULE_PATHNAME'
LANGUAGE C;
COMMENT ON FUNCTION pgactive_version_num() IS
'This pgactive version represented as (major)*10^4 + (minor)*10^2 + (revision). The subrevision is not included. So 0.8.0.1 is 800';
CREATE FUNCTION pgactive_min_remote_version_num()
RETURNS integer
AS 'MODULE_PATHNAME'
LANGUAGE C;
COMMENT ON FUNCTION pgactive_min_remote_version_num() IS
'The oldest pgactive version that this pgactive extension can exchange data with';
CREATE FUNCTION _pgactive_get_node_info_private (
local_dsn text,
remote_dsn text DEFAULT NULL,
sysid OUT text,
timeline OUT oid,
dboid OUT oid,
variant OUT text,
version OUT text,
version_num OUT integer,
min_remote_version_num OUT integer,
has_required_privs OUT boolean,
node_status OUT "char",
node_name OUT text,
dbname OUT text,
dbsize OUT int8,
indexessize OUT int8,
max_nodes OUT integer,
skip_ddl_replication OUT boolean,
cur_nodes OUT integer,
datcollate OUT text,
datctype OUT text)
RETURNS record
AS 'MODULE_PATHNAME','pgactive_get_node_info'
LANGUAGE C;
REVOKE ALL ON FUNCTION _pgactive_get_node_info_private(text, text) FROM public;
COMMENT ON FUNCTION _pgactive_get_node_info_private(text, text) IS
'Verify both replication and non-replication connections to the given dsn and get node info; when specified remote_dsn ask remote node to connect back to local node';
CREATE TABLE pgactive_connections (
conn_sysid text not null,
conn_timeline oid not null,
conn_dboid oid not null, -- This is an oid local to the node_sysid cluster
-- Wondering why there's no FOREIGN KEY to pgactive.pgactive_nodes?
--
-- pgactive.pgactive_nodes won't be populated when the pgactive.pgactive_connections row gets
-- created on the local node.
PRIMARY KEY(conn_sysid, conn_timeline, conn_dboid),
conn_dsn text not null,
conn_apply_delay integer
CHECK (conn_apply_delay >= 0),
conn_replication_sets text[]
);
REVOKE ALL ON TABLE pgactive_connections FROM public;
COMMENT ON TABLE pgactive_connections IS 'Connection information for nodes in the group. Don''t modify this directly, use the provided functions. One entry should exist per node in the group.';
COMMENT ON COLUMN pgactive_connections.conn_sysid IS 'System identifer for the node this entry''s dsn refers to';
COMMENT ON COLUMN pgactive_connections.conn_timeline IS 'System timeline ID for the node this entry''s dsn refers to';
COMMENT ON COLUMN pgactive_connections.conn_dboid IS 'System database OID for the node this entry''s dsn refers to';
COMMENT ON COLUMN pgactive_connections.conn_dsn IS 'A libpq-style connection string specifying how to make a connection to this node from other nodes';
COMMENT ON COLUMN pgactive_connections.conn_apply_delay IS 'If set, milliseconds to wait before applying each transaction from the remote node. Mainly for debugging. If null, the global default applies.';
COMMENT ON COLUMN pgactive_connections.conn_replication_sets IS 'Replication sets this connection should participate in, if non-default';
SELECT pg_catalog.pg_extension_config_dump('pgactive_connections', 'WHERE false');
CREATE FUNCTION pgactive_connections_changed()
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C;
REVOKE ALL ON FUNCTION pgactive_connections_changed() FROM public;
COMMENT ON FUNCTION pgactive_connections_changed() IS
'Internal pgactive function, do not call directly';
--
-- This is a helper for node_join, for internal use only. It's called on the
-- remote end by the init code when joining an existing group, to do the
-- remote-side setup.
--
CREATE FUNCTION _pgactive_join_node_private (
sysid text, timeline oid, dboid oid,
node_dsn text,
apply_delay integer,
replication_sets text[]
)
RETURNS void LANGUAGE plpgsql VOLATILE
SET search_path = pgactive, pg_catalog
AS
$body$
DECLARE
status "char";
BEGIN
LOCK TABLE pgactive.pgactive_connections IN EXCLUSIVE MODE;
LOCK TABLE pg_catalog.pg_shseclabel IN EXCLUSIVE MODE;
-- Assert that the joining node has a pgactive_nodes entry with state = i on this join-target node
SELECT INTO status
FROM pgactive.pgactive_nodes
WHERE node_sysid = sysid
AND node_timeline = timeline
AND node_dboid = dboid;
IF NOT FOUND THEN
RAISE object_not_in_prerequisite_state
USING MESSAGE = format('pgactive.pgactive_nodes entry for (%s,%s,%s) not found',
sysid, timeline, dboid);
END IF;
IF status <> 'i' THEN
RAISE object_not_in_prerequisite_state
USING MESSAGE = format('pgactive.pgactive_nodes entry for (%s,%s,%s) has unexpected status %L (expected ''i'')',
sysid, timeline, dboid, status);
END IF;
-- Insert or Update the connection info on this node, which we must be
-- initing from.
-- No need to care about concurrency here as we hold EXCLUSIVE LOCK.
BEGIN
INSERT INTO pgactive.pgactive_connections
(conn_sysid, conn_timeline, conn_dboid,
conn_dsn,
conn_apply_delay, conn_replication_sets)
VALUES
(sysid, timeline, dboid, node_dsn,
CASE WHEN apply_delay = -1 THEN NULL ELSE apply_delay END,
replication_sets);
EXCEPTION WHEN unique_violation THEN
UPDATE pgactive.pgactive_connections
SET conn_dsn = node_dsn,
conn_apply_delay = CASE WHEN apply_delay = -1 THEN NULL ELSE apply_delay END,
conn_replication_sets = replication_sets
WHERE conn_sysid = sysid
AND conn_timeline = timeline
AND conn_dboid = dboid;
END;
-- Schedule the apply worker launch for commit time
PERFORM pgactive.pgactive_connections_changed();
END;
$body$;
CREATE FUNCTION _pgactive_update_seclabel_private()
RETURNS void LANGUAGE plpgsql
SET search_path = pgactive, pg_catalog
-- SET pgactive.permit_unsafe_ddl_commands = on is removed for now
SET pgactive.skip_ddl_replication = on
-- SET pgactive.skip_ddl_locking = on is removed for now
AS $body$
DECLARE
v_label json;
BEGIN
-- Update 'pgactive' parameter in the current label if there's one. (Right now,
-- there's not much point to this but later we'll be possibly have more
-- information in there.)
-- First, select existing label
SELECT label::json INTO v_label
FROM pg_catalog.pg_shseclabel
WHERE provider = 'pgactive'
AND classoid = 'pg_database'::regclass
AND objoid = (SELECT oid FROM pg_database WHERE datname = current_database());
-- Then, replace 'pgactive' with 'pgactive'::true
SELECT json_object_agg(key, value) INTO v_label
FROM (
SELECT key, value
FROM json_each(v_label)
WHERE key <> 'pgactive'
UNION ALL
SELECT 'pgactive', to_json(true)
) d;
-- And, set the newly computed label (It's safe to do this early, it won't
-- take effect until commit).
EXECUTE format('SECURITY LABEL FOR pgactive ON DATABASE %I IS %L',
current_database(), v_label);
END;
$body$;
CREATE FUNCTION _pgactive_begin_join_private (
caller text,
node_name text,
node_dsn text,
remote_dsn text,
remote_sysid OUT text,
remote_timeline OUT oid,
remote_dboid OUT oid,
bypass_collation_check boolean,
bypass_node_identifier_creation boolean,
bypass_user_tables_check boolean
)
RETURNS record LANGUAGE plpgsql VOLATILE
SET search_path = pgactive, pg_catalog
-- SET pgactive.permit_unsafe_ddl_commands = on is removed for now
SET pgactive.skip_ddl_replication = on
-- SET pgactive.skip_ddl_locking = on is removed for now
AS $body$
DECLARE
localid RECORD;
localid_from_dsn RECORD;
remote_nodeinfo RECORD;
remote_nodeinfo_r RECORD;
cur_node RECORD;
local_max_node_value integer;
local_skip_ddl_replication_value boolean;
local_db_collation_info_r RECORD;
collation_errmsg text;
collation_hintmsg text;
data_dir text;
temp_dump_dir text;
same_file_system_mount_point boolean;
free_disk_space1 int8;
free_disk_space1_p text;
free_disk_space2 int8;
free_disk_space2_p text;
remote_dbsize_p text;
BEGIN
-- Only one tx can be adding connections
LOCK TABLE pgactive.pgactive_connections IN EXCLUSIVE MODE;
LOCK TABLE pgactive.pgactive_nodes IN EXCLUSIVE MODE;
LOCK TABLE pg_catalog.pg_shseclabel IN EXCLUSIVE MODE;
-- Generate pgactive node identifier if asked
IF bypass_node_identifier_creation THEN
RAISE WARNING USING
MESSAGE = 'skipping creation of pgactive node identifier for this node',
HINT = 'The ''bypass_node_identifier_creation'' option is only available for pgactive_init_copy tool.';
ELSE
PERFORM pgactive._pgactive_generate_node_identifier_private();
END IF;
SELECT sysid, timeline, dboid INTO localid
FROM pgactive.pgactive_get_local_nodeid();
RAISE LOG USING MESSAGE = format('node identity of node being created is (%s,%s,%s)', localid.sysid, localid.timeline, localid.dboid);
-- If there's already an entry for ourselves in pgactive.pgactive_connections then we
-- know this node is part of an active pgactive group and cannot be joined to
-- another group.
PERFORM 1 FROM pgactive_connections
WHERE conn_sysid = localid.sysid
AND conn_timeline = localid.timeline
AND conn_dboid = localid.dboid;
IF FOUND THEN
RAISE USING
MESSAGE = 'this node is already a member of a pgactive group',
HINT = 'Connect to the node you wish to add and run '||caller||' from it instead.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
-- Validate that the local connection is usable and matches the node
-- identity of the node we're running on.
--
-- For pgactive this will NOT check the 'dsn' if 'node_dsn' gets supplied.
-- We don't know if 'dsn' is even valid for loopback connections and can't
-- assume it is. That'll get checked later by pgactive specific code.
--
-- We'll get a null node name back at this point since we haven't inserted
-- our nodes record (and it wouldn't have committed yet if we had).
--
SELECT * INTO localid_from_dsn
FROM _pgactive_get_node_info_private(node_dsn);
IF localid_from_dsn.sysid <> localid.sysid
OR localid_from_dsn.timeline <> localid.timeline
OR localid_from_dsn.dboid <> localid.dboid
THEN
RAISE USING
MESSAGE = 'node identity for local dsn does not match current node',
DETAIL = format($$The dsn '%s' connects to a node with identity (%s,%s,%s) but the local node is (%s,%s,%s)$$,
node_dsn, localid_from_dsn.sysid, localid_from_dsn.timeline,
localid_from_dsn.dboid, localid.sysid, localid.timeline, localid.dboid),
HINT = 'The node_dsn parameter must refer to the node you''re running this function from.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF NOT localid_from_dsn.has_required_privs THEN
RAISE USING
MESSAGE = 'node_dsn does not have required rights',
DETAIL = format($$The dsn '%s' connects successfully but does not have required rights.$$, node_dsn),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF NOT bypass_user_tables_check THEN
PERFORM 1 FROM pg_class r
INNER JOIN pg_namespace n ON r.relnamespace = n.oid
WHERE n.nspname NOT IN ('pg_catalog', 'pgactive', 'information_schema')
AND relkind = 'r' AND relpersistence = 'p';
IF FOUND THEN
RAISE USING
MESSAGE = 'database joining pgactive group has existing user tables',
HINT = 'Ensure no user tables in the database.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
END IF;
-- Now interrogate the remote node, if specified, and sanity check its
-- connection too. The discovered node identity is returned if found.
--
-- This will error out if there are issues with the remote node.
IF remote_dsn IS NOT NULL THEN
SELECT * INTO remote_nodeinfo
FROM _pgactive_get_node_info_private(remote_dsn);
remote_sysid := remote_nodeinfo.sysid;
remote_timeline := remote_nodeinfo.timeline;
remote_dboid := remote_nodeinfo.dboid;
IF NOT remote_nodeinfo.has_required_privs THEN
RAISE USING
MESSAGE = 'connection to remote node does not have required rights',
DETAIL = format($$The dsn '%s' connects successfully but does not have required rights.$$, remote_dsn),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF remote_nodeinfo.version_num < pgactive_min_remote_version_num() THEN
RAISE USING
MESSAGE = 'remote node''s pgactive version is too old',
DETAIL = format($$The dsn '%s' connects successfully but the remote node version %s is less than the required version %s.$$,
remote_dsn, remote_nodeinfo.version_num, pgactive_min_remote_version_num()),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF remote_nodeinfo.min_remote_version_num > pgactive_version_num() THEN
RAISE USING
MESSAGE = 'remote node''s pgactive version is too new or this node''s version is too old',
DETAIL = format($$The dsn '%s' connects successfully but the remote node version %s requires this node to run at least pgactive %s, not the current %s.$$,
remote_dsn, remote_nodeinfo.version_num, remote_nodeinfo.min_remote_version_num,
pgactive_min_remote_version_num()),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF remote_nodeinfo.node_status IS NULL THEN
RAISE USING
MESSAGE = 'remote node does not appear to be a fully running pgactive node',
DETAIL = format($$The dsn '%s' connects successfully but the target node has no entry in pgactive.pgactive_nodes.$$, remote_dsn),
ERRCODE = 'object_not_in_prerequisite_state';
ELSIF remote_nodeinfo.node_status IS DISTINCT FROM pgactive.pgactive_node_status_to_char('pgactive_NODE_STATUS_READY') THEN
RAISE USING
MESSAGE = 'remote node does not appear to be a fully running pgactive node',
DETAIL = format($$The dsn '%s' connects successfully but the target node has pgactive.pgactive_nodes node_status=%s instead of expected 'r'.$$, remote_dsn, remote_nodeinfo.node_status),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
SELECT setting::integer INTO local_max_node_value FROM pg_settings
WHERE name = 'pgactive.max_nodes';
IF local_max_node_value <> remote_nodeinfo.max_nodes THEN
RAISE USING
MESSAGE = 'joining node and pgactive group have different values for pgactive.max_nodes parameter',
DETAIL = format('pgactive.max_nodes value for joining node is ''%s'' and remote node is ''%s''.',
local_max_node_value, remote_nodeinfo.max_nodes),
HINT = 'The parameter must be set to the same value on all pgactive members.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
SELECT setting FROM pg_settings
WHERE name = 'data_directory' INTO data_dir;
SELECT pgactive.get_free_disk_space(data_dir) INTO free_disk_space1;
SELECT pg_size_pretty(free_disk_space1) INTO free_disk_space1_p;
SELECT pg_size_pretty(remote_nodeinfo.dbsize) INTO remote_dbsize_p;
-- We estimate that postgres needs 20% more disk space as temporary
-- workspace while restoring database for running queries or building
-- indexes. Note that it is just an estimation, the actual disk space
-- needed depends on various factors. Hence we emit a warning to inform
-- early, not an error.
IF free_disk_space1 < (1.2 * remote_nodeinfo.dbsize) THEN
RAISE WARNING USING
MESSAGE = 'node might fail to join pgactive group as disk space is likely to be insufficient',
DETAIL = format('joining node data directory file system mount point has %s free disk space and remote database is %s in size.',
free_disk_space1_p, remote_dbsize_p),
HINT = 'Ensure enough free space on joining node file system.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
SELECT setting FROM pg_settings
WHERE name = 'pgactive.temp_dump_directory' INTO temp_dump_dir;
SELECT pgactive.get_free_disk_space(temp_dump_dir) INTO free_disk_space2;
SELECT pg_size_pretty(free_disk_space2) INTO free_disk_space2_p;
-- We estimate that pg_dump needs at least 50% of database size
-- excluding total size of indexes on the database. Note that it is
-- just an estimation, the actual disk space needed depends on various
-- factors. Hence we emit a warning to inform early, not an error.
IF free_disk_space2 < ((remote_nodeinfo.dbsize - remote_nodeinfo.indexessize)/2) THEN
RAISE WARNING USING
MESSAGE = 'node might fail to join pgactive group as disk space required to store temporary dump is likely to be insufficient',
DETAIL = format('pgactive.temp_dump_directory file system mount point has %s free disk space and remote database is %s in size.',
free_disk_space2_p, remote_dbsize_p),
HINT = 'Ensure enough free space on pgactive.temp_dump_directory file system.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
SELECT pgactive.check_file_system_mount_points(data_dir, temp_dump_dir)
INTO same_file_system_mount_point;
IF same_file_system_mount_point THEN
IF free_disk_space1 <
((1.2 * remote_nodeinfo.dbsize) + ((remote_nodeinfo.dbsize - remote_nodeinfo.indexessize)/2)) THEN
RAISE WARNING USING
MESSAGE = 'node might fail to join pgactive group as disk space required to store both remote database and temporary dump is likely to be insufficient',
HINT = 'Ensure enough free space on joining node file system.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
END IF;
-- using pg_file_settings here as pgactive.skip_ddl_replication is SET to on when entering
-- the function.
SELECT COALESCE((SELECT setting::boolean
FROM pg_file_settings
WHERE name = 'pgactive.skip_ddl_replication' ORDER BY seqno DESC LIMIT 1),
true) INTO local_skip_ddl_replication_value;
IF local_skip_ddl_replication_value <> remote_nodeinfo.skip_ddl_replication THEN
RAISE USING
MESSAGE = 'joining node and pgactive group have different values for pgactive.skip_ddl_replication parameter',
DETAIL = format('pgactive.skip_ddl_replication value for joining node is ''%s'' and remote node is ''%s''.',
local_skip_ddl_replication_value, remote_nodeinfo.skip_ddl_replication),
HINT = 'The parameter must be set to the same value on all pgactive members.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF local_max_node_value = remote_nodeinfo.cur_nodes THEN
RAISE USING
MESSAGE = 'cannot allow more than pgactive.max_nodes number of nodes in a pgactive group',
HINT = 'Increase pgactive.max_nodes parameter value on joining node as well as on all other pgactive members.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
SELECT datcollate, datctype FROM pg_database
WHERE datname = current_database() INTO local_db_collation_info_r;
IF local_db_collation_info_r.datcollate <> remote_nodeinfo.datcollate OR
local_db_collation_info_r.datctype <> remote_nodeinfo.datctype THEN
collation_errmsg := 'joining node and remote node have different database collation settings';
collation_hintmsg := 'Use the same database collation settings for both nodes.';
IF bypass_collation_check THEN
RAISE WARNING USING
MESSAGE = collation_errmsg,
HINT = collation_hintmsg,
ERRCODE = 'object_not_in_prerequisite_state';
ELSE
RAISE EXCEPTION USING
MESSAGE = collation_errmsg,
HINT = collation_hintmsg,
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
END IF;
END IF;
-- Create local node record so the apply worker knows to start initializing
-- this node with pgactive_init_replica when it's started.
--
-- pgactive_init_copy might've created a node entry in catchup mode already, in
-- which case we can skip this.
SELECT * FROM pgactive_nodes
WHERE node_sysid = localid.sysid
AND node_timeline = localid.timeline
AND node_dboid = localid.dboid
INTO cur_node;
IF NOT FOUND THEN
INSERT INTO pgactive_nodes (
node_name,
node_sysid, node_timeline, node_dboid,
node_status, node_dsn, node_init_from_dsn
) VALUES (
node_name,
localid.sysid, localid.timeline, localid.dboid,
pgactive.pgactive_node_status_to_char('pgactive_NODE_STATUS_BEGINNING_INIT'),
node_dsn, remote_dsn
);
ELSIF pgactive.pgactive_node_status_from_char(cur_node.node_status) = 'pgactive_NODE_STATUS_CATCHUP' THEN
RAISE DEBUG 'starting node join in pgactive_NODE_STATUS_CATCHUP';
ELSE
RAISE USING
MESSAGE = 'a pgactive_nodes entry for this node already exists',
DETAIL = format('pgactive.pgactive_nodes entry for (%s,%s,%s) named ''%s'' with status %s exists.',
cur_node.node_sysid, cur_node.node_timeline, cur_node.node_dboid,
cur_node.node_name, pgactive.pgactive_node_status_from_char(cur_node.node_status)),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
PERFORM pgactive._pgactive_update_seclabel_private();
END;
$body$;
--
-- The public interface for node join/addition, to be run to join a currently
-- unconnected node with a blank database to a pgactive group.
--
CREATE FUNCTION pgactive_join_group (
node_name text,
node_dsn text,
join_using_dsn text,
apply_delay integer DEFAULT NULL,
replication_sets text[] DEFAULT ARRAY['default'],
bypass_collation_check boolean DEFAULT false,
bypass_node_identifier_creation boolean DEFAULT false,
bypass_user_tables_check boolean DEFAULT false
)
RETURNS void LANGUAGE plpgsql VOLATILE
SET search_path = pgactive, pg_catalog
-- SET pgactive.permit_unsafe_ddl_commands = on is removed for now
SET pgactive.skip_ddl_replication = on
-- SET pgactive.skip_ddl_locking = on is removed for now
AS $body$
DECLARE
localid record;
connectback_nodeinfo record;
remoteinfo record;
BEGIN
-- Prohibit enabling pgactive where pglogical is installed
IF (
SELECT count(1)
FROM pg_extension
WHERE extname = 'pglogical'
) > 0
THEN
RAISE USING
MESSAGE = 'pgactive can''t be enabled because an external logical replication extension is installed',
ERRCODE = 'object_not_in_prerequisite_state',
DETAIL = 'pgactive doesn''t allow a node to pull in changes from more than one logical replication sources';
END IF;
-- Prohibit enabling pgactive where a subscription exists
IF (
SELECT count(1)
FROM pg_subscription
WHERE subdbid = (SELECT oid
FROM pg_database
WHERE datname = current_database()
)
) > 0
THEN
RAISE USING
MESSAGE = 'pgactive can''t be enabled because a logical replication subscription is created',
ERRCODE = 'object_not_in_prerequisite_state',
DETAIL = 'pgactive doesn''t allow a node to pull in changes from more than one logical replication sources';
END IF;
IF node_dsn IS NULL THEN
RAISE USING
MESSAGE = 'node_dsn can not be null',
ERRCODE = 'invalid_parameter_value';
END IF;
PERFORM pgactive._pgactive_begin_join_private(
caller := '',
node_name := node_name,
node_dsn := node_dsn,
remote_dsn := join_using_dsn,
bypass_collation_check := bypass_collation_check,
bypass_node_identifier_creation := bypass_node_identifier_creation,
bypass_user_tables_check := bypass_user_tables_check);
SELECT sysid, timeline, dboid INTO localid
FROM pgactive.pgactive_get_local_nodeid();
-- Request additional connection tests to determine that the remote is
-- reachable for replication and non-replication mode and that the remote
-- can connect back to us via 'dsn' on non-replication and replication
-- modes.
--
-- This cannot be checked for the first node since there's no peer to ask
-- for help.
IF join_using_dsn IS NOT NULL THEN
SELECT * INTO connectback_nodeinfo
FROM pgactive._pgactive_get_node_info_private(node_dsn, join_using_dsn);
-- The connectback must actually match our local node identity and must
-- provide a connection that has required rights.
IF NOT connectback_nodeinfo.has_required_privs THEN
RAISE USING
MESSAGE = 'node_dsn does not have required rights when connecting via remote node',
DETAIL = format($$The dsn '%s' connects successfully but does not have required rights.$$, dsn),
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF (connectback_nodeinfo.sysid, connectback_nodeinfo.timeline, connectback_nodeinfo.dboid)
IS DISTINCT FROM
(localid.sysid, localid.timeline, localid.dboid)
AND
(connectback_nodeinfo.sysid, connectback_nodeinfo.timeline, connectback_nodeinfo.dboid)
IS DISTINCT FROM