Skip to content

Commit 93dfdf8

Browse files
Add logic to support ranged GTID updates from proxysql_mysqlbinlog.
1 parent 831d5fd commit 93dfdf8

File tree

4 files changed

+173
-47
lines changed

4 files changed

+173
-47
lines changed

include/proxysql_gtid.h

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,38 @@
22
#define PROXYSQL_GTID
33
// highly inspired by libslave
44
// https://github.com/vozbu/libslave/
5-
#include <unordered_map>
65
#include <list>
6+
#include <string>
7+
#include <unordered_map>
78
#include <utility>
89

910
typedef std::pair<std::string, int64_t> gtid_t;
10-
typedef std::pair<int64_t, int64_t> gtid_interval_t;
11+
12+
class Gtid_Interval {
13+
public:
14+
int64_t start;
15+
int64_t end;
16+
17+
private:
18+
void init(const int64_t _start, const int64_t _end);
19+
void init(const char* s);
20+
21+
public:
22+
explicit Gtid_Interval(const int64_t _start, const int64_t _end);
23+
explicit Gtid_Interval(const char* s);
24+
explicit Gtid_Interval(const std::string& s);
25+
26+
const std::string to_string(void);
27+
const bool contains(int64_t gtid);
28+
const bool merge(const Gtid_Interval& other);
29+
30+
const int cmp(const Gtid_Interval& other);
31+
const bool operator<(const Gtid_Interval& other);
32+
const bool operator==(const Gtid_Interval& other);
33+
};
34+
typedef Gtid_Interval gtid_interval_t;
35+
36+
// TODO: make me a proper class.
1137
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;
1238

1339
/*
@@ -30,4 +56,4 @@ class Gtid_Server_Info {
3056
};
3157
*/
3258

33-
#endif /* PROXYSQL_GTID */
59+
#endif /* PROXYSQL_GTID */

lib/GTID_Server_Data.cpp

Lines changed: 32 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
220220
return false;
221221
}
222222
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
223-
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
223+
if (itr->contains((int64_t)gtid_trxid)) {
224224
// fprintf(stderr,"YES\n");
225225
return true;
226226
}
@@ -375,12 +375,7 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
375375
s.insert(23,"-");
376376
s = s + ":";
377377
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
378-
std::string s2 = s;
379-
s2 = s2 + std::to_string(itr->first);
380-
s2 = s2 + "-";
381-
s2 = s2 + std::to_string(itr->second);
382-
s2 = s2 + ",";
383-
gtid_set = gtid_set + s2;
378+
gtid_set += s + itr->to_string() + ",";
384379
}
385380
}
386381
// Extract latest comma only in case 'gtid_executed' isn't empty
@@ -391,54 +386,47 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
391386
}
392387

393388

394-
395-
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
396-
auto it = gtid_executed.find(gtid.first);
397-
if (it == gtid_executed.end())
398-
{
399-
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
389+
// Merges a GTID interval into a gitd_executed instance.
390+
void addGtid(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed) {
391+
auto it = gtid_executed.find(uuid);
392+
if (it == gtid_executed.end()) {
393+
// new UUID entry
394+
gtid_executed[uuid].emplace_back(iv);
400395
return;
401396
}
402397

403-
bool flag = true;
404-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
405-
{
406-
if (gtid.second >= itr->first && gtid.second <= itr->second)
407-
return;
408-
if (gtid.second + 1 == itr->first)
409-
{
410-
--itr->first;
411-
flag = false;
412-
break;
413-
}
414-
else if (gtid.second == itr->second + 1)
415-
{
416-
++itr->second;
417-
flag = false;
398+
// insert/merge GTID interval
399+
auto pos = it->second.begin();
400+
for (; pos != it->second.end(); ++pos) {
401+
if (pos->merge(iv))
418402
break;
419-
}
420-
else if (gtid.second < itr->first)
421-
{
422-
it->second.emplace(itr, gtid.second, gtid.second);
423-
return;
424-
}
403+
}
404+
if (pos == it->second.end()) {
405+
it->second.emplace_back(iv);
425406
}
426407

427-
if (flag)
428-
it->second.emplace_back(gtid.second, gtid.second);
429-
430-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
431-
{
432-
auto next_itr = std::next(itr);
433-
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
434-
{
435-
itr->second = next_itr->second;
436-
it->second.erase(next_itr);
408+
// merge overlapping GTID ranges, if any
409+
it->second.sort();
410+
auto a = it->second.begin();
411+
while (a != it->second.end()) {
412+
auto b = std::next(a);
413+
if (b == it->second.end()) {
437414
break;
438415
}
416+
if (a->merge(*b)) {
417+
it->second.erase(b);
418+
continue;
419+
}
420+
a++;
439421
}
440422
}
441423

424+
// Merges a single GTID into a gitd_executed instance.
425+
inline void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
426+
gtid_interval_t iv = Gtid_Interval(gtid.second, gtid.second);
427+
addGtid(gtid.first, iv, gtid_executed);
428+
}
429+
442430
void * GTID_syncer_run() {
443431
//struct ev_loop * gtid_ev_loop;
444432
//gtid_ev_loop = NULL;

lib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
7171
ProxySQL_Admin_Tests.oo ProxySQL_Admin_Tests2.oo ProxySQL_Admin_Scheduler.oo ProxySQL_Admin_Disk_Upgrade.oo ProxySQL_Admin_Stats.oo \
7272
Admin_Handler.oo Admin_FlushVariables.oo Admin_Bootstrap.oo \
7373
Base_Session.oo Base_Thread.oo \
74+
proxysql_gtid.oo \
7475
proxy_protocol_info.oo \
7576
proxysql_find_charset.oo ProxySQL_Poll.oo \
7677
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \

lib/proxysql_gtid.cpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#include <cstdint>
2+
#include <cstdio>
3+
#include <cstdlib>
4+
#include <string>
5+
6+
#include "proxysql_gtid.h"
7+
8+
9+
// Initializes a GTID interval.
10+
// Implemented via private method as C++03 does not support delegated constructors :(
11+
void Gtid_Interval::init(const int64_t _start, const int64_t _end) {
12+
start = _start;
13+
end = _end;
14+
15+
if (start > end) {
16+
std::swap(start, end);
17+
}
18+
}
19+
20+
// Initializes a GTID interval from a string buffer, in [gtid]{-[gtid]} format.
21+
void Gtid_Interval::init(const char *s) {
22+
uint64_t _start, _end;
23+
24+
if (sscanf(s, "%lu-%lu", &_start, &_end) == 2) {
25+
init((int64_t)_start, (int64_t)_end);
26+
return;
27+
}
28+
if (sscanf(s, "%lu", &_start) == 1) {
29+
init((int64_t)_start, (int64_t)_start);
30+
return;
31+
}
32+
33+
init(0, 0);
34+
}
35+
36+
37+
Gtid_Interval::Gtid_Interval(const int64_t _start, const int64_t _end) {
38+
init(_start, _end);
39+
}
40+
41+
Gtid_Interval::Gtid_Interval(const char *s) {
42+
init(s);
43+
}
44+
45+
Gtid_Interval::Gtid_Interval(const std::string& s) {
46+
init(s.c_str());
47+
}
48+
49+
// Checks if a given GTID is contained in this interval.
50+
const bool Gtid_Interval::contains(int64_t gtid) {
51+
return (gtid >= start && gtid <= end);
52+
}
53+
54+
// Yields a string representation for a GTID interval.
55+
const std::string Gtid_Interval::to_string(void) {
56+
if (start == end) {
57+
return std::to_string(start);
58+
}
59+
return std::to_string(start) + "-" + std::to_string(end);
60+
}
61+
62+
// Attempts to merge two GTID intervals. Returns true if the intervals were merged (and potentially modified), false otherwise.
63+
const bool Gtid_Interval::merge(const Gtid_Interval& other) {
64+
if (other.start >= start && other.end <= end) {
65+
// other is contained by interval
66+
return true;
67+
}
68+
if (other.start <= start && other.end >= end) {
69+
// other contains whole of existing interval
70+
start = other.start;
71+
end = other.end;
72+
return true;
73+
}
74+
if (other.start <= start && other.end >= (start-1)) {
75+
// other overlaps interval at start
76+
start = other.start;
77+
return true;
78+
}
79+
if (other.end >= end && other.start <= (end+1)) {
80+
// other overlaps interval at end
81+
end = other.end;
82+
return true;
83+
}
84+
85+
return false;
86+
}
87+
88+
// Comapres two GTID intervals, by strict weak ordering.
89+
const int Gtid_Interval::cmp(const Gtid_Interval& other) {
90+
if (start < other.start) {
91+
return -1;
92+
}
93+
if (start > other.start) {
94+
return 1;
95+
}
96+
if (end < other.end) {
97+
return -1;
98+
}
99+
if (end > other.end) {
100+
return 1;
101+
}
102+
return 0;
103+
}
104+
105+
const bool Gtid_Interval::operator<(const Gtid_Interval& other) {
106+
return cmp(other) == -1;
107+
}
108+
109+
const bool Gtid_Interval::operator==(const Gtid_Interval& other) {
110+
return cmp(other) == 0;
111+
}

0 commit comments

Comments
 (0)