Skip to content

Commit dd126a2

Browse files
Add logic to support ranged GTID updates from proxysql_mysqlbinlog.
1 parent 831d5fd commit dd126a2

File tree

4 files changed

+125
-47
lines changed

4 files changed

+125
-47
lines changed

include/proxysql_gtid.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,31 @@
22
#define PROXYSQL_GTID
33
// highly inspired by libslave
44
// https://github.com/vozbu/libslave/
5-
#include <unordered_map>
65
#include <list>
6+
#include <string>
7+
#include <unordered_map>
78
#include <utility>
89

910
typedef std::pair<std::string, int64_t> gtid_t;
10-
typedef std::pair<int64_t, int64_t> gtid_interval_t;
11+
12+
class Gtid_Interval {
13+
public:
14+
int64_t start;
15+
int64_t end;
16+
17+
public:
18+
Gtid_Interval(int64_t _start, int64_t _end);
19+
20+
const std::string to_string(void);
21+
bool merge(const Gtid_Interval& other);
22+
23+
const int cmp(const Gtid_Interval& other);
24+
const bool operator<(const Gtid_Interval& other);
25+
const bool operator==(const Gtid_Interval& other);
26+
};
27+
typedef Gtid_Interval gtid_interval_t;
28+
29+
// TODO: make me a proper class.
1130
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;
1231

1332
/*
@@ -30,4 +49,4 @@ class Gtid_Server_Info {
3049
};
3150
*/
3251

33-
#endif /* PROXYSQL_GTID */
52+
#endif /* PROXYSQL_GTID */

lib/GTID_Server_Data.cpp

Lines changed: 32 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
220220
return false;
221221
}
222222
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
223-
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
223+
if ((int64_t)gtid_trxid >= itr->start && (int64_t)gtid_trxid <= itr->end) {
224224
// fprintf(stderr,"YES\n");
225225
return true;
226226
}
@@ -375,12 +375,7 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
375375
s.insert(23,"-");
376376
s = s + ":";
377377
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
378-
std::string s2 = s;
379-
s2 = s2 + std::to_string(itr->first);
380-
s2 = s2 + "-";
381-
s2 = s2 + std::to_string(itr->second);
382-
s2 = s2 + ",";
383-
gtid_set = gtid_set + s2;
378+
gtid_set += s + itr->to_string() + ",";
384379
}
385380
}
386381
// Extract latest comma only in case 'gtid_executed' isn't empty
@@ -391,54 +386,47 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
391386
}
392387

393388

394-
395-
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
396-
auto it = gtid_executed.find(gtid.first);
397-
if (it == gtid_executed.end())
398-
{
399-
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
389+
// Merges a GTID interval into a gitd_executed instance.
390+
void addGtid(std::string uuid, gtid_interval_t &iv, gtid_set_t& gtid_executed) {
391+
auto it = gtid_executed.find(uuid);
392+
if (it == gtid_executed.end()) {
393+
// new UUID entry
394+
gtid_executed[uuid].emplace_back(iv);
400395
return;
401396
}
402397

403-
bool flag = true;
404-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
405-
{
406-
if (gtid.second >= itr->first && gtid.second <= itr->second)
407-
return;
408-
if (gtid.second + 1 == itr->first)
409-
{
410-
--itr->first;
411-
flag = false;
412-
break;
413-
}
414-
else if (gtid.second == itr->second + 1)
415-
{
416-
++itr->second;
417-
flag = false;
398+
// insert/merge GTID interval
399+
auto pos = it->second.begin();
400+
for (; pos != it->second.end(); ++pos) {
401+
if (pos->merge(iv))
418402
break;
419-
}
420-
else if (gtid.second < itr->first)
421-
{
422-
it->second.emplace(itr, gtid.second, gtid.second);
423-
return;
424-
}
403+
}
404+
if (pos == it->second.end()) {
405+
it->second.emplace_back(iv);
425406
}
426407

427-
if (flag)
428-
it->second.emplace_back(gtid.second, gtid.second);
429-
430-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
431-
{
432-
auto next_itr = std::next(itr);
433-
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
434-
{
435-
itr->second = next_itr->second;
436-
it->second.erase(next_itr);
408+
// merge overlapping GTID ranges, if any
409+
it->second.sort();
410+
auto a = it->second.begin();
411+
while (a != it->second.end()) {
412+
auto b = std::next(a);
413+
if (b == it->second.end()) {
437414
break;
438415
}
416+
if (a->merge(*b)) {
417+
it->second.erase(b);
418+
continue;
419+
}
420+
a++;
439421
}
440422
}
441423

424+
// Merges a single GTID into a gitd_executed instance.
425+
inline void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
426+
gtid_interval_t iv = Gtid_Interval(gtid.second, gtid.second);
427+
addGtid(gtid.first, iv, gtid_executed);
428+
}
429+
442430
void * GTID_syncer_run() {
443431
//struct ev_loop * gtid_ev_loop;
444432
//gtid_ev_loop = NULL;

lib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
7171
ProxySQL_Admin_Tests.oo ProxySQL_Admin_Tests2.oo ProxySQL_Admin_Scheduler.oo ProxySQL_Admin_Disk_Upgrade.oo ProxySQL_Admin_Stats.oo \
7272
Admin_Handler.oo Admin_FlushVariables.oo Admin_Bootstrap.oo \
7373
Base_Session.oo Base_Thread.oo \
74+
proxysql_gtid.oo \
7475
proxy_protocol_info.oo \
7576
proxysql_find_charset.oo ProxySQL_Poll.oo \
7677
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \

lib/proxysql_gtid.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#include <string>
2+
3+
#include "proxysql_gtid.h"
4+
5+
6+
// Initializes a GTID interval
7+
Gtid_Interval::Gtid_Interval(int64_t _start, int64_t _end) : start(_start), end(_end) {
8+
if (start > end) {
9+
std::swap(start, end);
10+
}
11+
}
12+
13+
// Yields a string representation for a GTID interval
14+
const std::string Gtid_Interval::to_string(void) {
15+
if (start == end) {
16+
return std::to_string(start);
17+
}
18+
return std::to_string(start) + "-" + std::to_string(end);
19+
}
20+
21+
// Attempts to merge two GTID intervals. Returns true if the intervals were merged (and potentially modified), false otherwise.
22+
bool Gtid_Interval::merge(const Gtid_Interval& other) {
23+
if (other.start >= start && other.end <= end) {
24+
// other is contained by interval
25+
return true;
26+
}
27+
if (other.start <= start && other.end >= end) {
28+
// other contains whole of existing interval
29+
start = other.start;
30+
end = other.end;
31+
return true;
32+
}
33+
if (other.start <= start && other.end >= (start-1)) {
34+
// other overlaps interval at start
35+
start = other.start;
36+
return true;
37+
}
38+
if (other.end >= end && other.start <= (end+1)) {
39+
// other overlaps interval at end
40+
end = other.end;
41+
return true;
42+
}
43+
44+
return false;
45+
}
46+
47+
// Comapres two GTID intervals, by strict weak ordering.
48+
const int Gtid_Interval::cmp(const Gtid_Interval& other) {
49+
if (start < other.start) {
50+
return -1;
51+
}
52+
if (start > other.start) {
53+
return 1;
54+
}
55+
if (end < other.end) {
56+
return -1;
57+
}
58+
if (end > other.end) {
59+
return 1;
60+
}
61+
return 0;
62+
}
63+
64+
const bool Gtid_Interval::operator<(const Gtid_Interval& other) {
65+
return cmp(other) == -1;
66+
}
67+
68+
const bool Gtid_Interval::operator==(const Gtid_Interval& other) {
69+
return cmp(other) == 0;
70+
}

0 commit comments

Comments
 (0)