Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,6 @@ bool DDLWorker::initializeMainThread()
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
markReplicasActive(true);
initialized = true;
return true;
}
Expand Down Expand Up @@ -1212,6 +1211,14 @@ void DDLWorker::runMainThread()
}

cleanup_event->set();
try
{
markReplicasActive(reinitialized);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred when markReplicasActive: ");
}
scheduleTasks(reinitialized);
subsequent_errors_count = 0;

Expand Down Expand Up @@ -1290,20 +1297,23 @@ void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
}

void DDLWorker::markReplicasActive(bool /*reinitialized*/)
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getZooKeeper();

// Reset all active_node_holders
for (auto & it : active_node_holders)
if (reinitialized)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}

active_node_holders.clear();
active_node_holders.clear();
}

for (auto it = active_node_holders.begin(); it != active_node_holders.end();)
{
Expand Down Expand Up @@ -1384,12 +1394,7 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
{
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
}
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(active_path, active_id, zkutil::CreateMode::Ephemeral));
/// To bump node mtime
ops.emplace_back(zkutil::makeSetRequest(fs::path(replicas_dir) / host_id, "", -1));
zookeeper->multi(ops);

zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
auto active_node_holder_zookeeper = zookeeper;
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
Expand Down
Loading