Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -368,6 +369,16 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return future;
}

private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles,
NamespaceBundle bundleRange) {
try {
namespaceBundles.validateBundle(bundleRange);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}

// Attempt to local the data for the given bundle in metadata store
// If it cannot be found, return the default bundle data.
@Override
Expand Down Expand Up @@ -762,8 +773,14 @@ public void checkNamespaceBundleSplit() {
try {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
if (!namespaceBundleFactory
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, bundleRange);
if (!namespaceBundleFactory.canSplitBundle(bundle)) {
continue;
}

NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName));
if (!checkBundleDataExistInNamespaceBundles(bundles, bundle)) {
log.warn("Bundle {} has been removed, skip split this bundle ", bundleName);
continue;
}

Expand Down Expand Up @@ -1113,7 +1130,7 @@ public void writeBrokerDataOnZooKeeper() {

@Override
public void writeBrokerDataOnZooKeeper(boolean force) {
lock.lock();
// lock.lock();
try {
updateLocalBrokerData();

Expand All @@ -1137,7 +1154,7 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
throw (ConcurrentModificationException) e;
}
} finally {
lock.unlock();
// lock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be modified. Small problem.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int size() {
return bundles.size();
}

public void validateBundle(NamespaceBundle nsBundle) throws Exception {
public void validateBundle(NamespaceBundle nsBundle) throws IllegalArgumentException {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData()
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}

@Test
public void testRepeatSplitBundle() throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "repeat-split-bundle";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;

admin1.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);

LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData");

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();

// create a lot of topic to fully distributed among bundles.
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment

pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
Comment on lines +1159 to +1160
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test creates consumers but never closes them, which can lead to resource leaks. Each consumer created in the loop should be closed after use, or stored in a list and closed in a cleanup step.

Copilot uses AI. Check for mistakes.
}

String topicToFindBundle = topicName + 0;
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
String bundleKey = realBundle.toString();
log.info("Before bundle={}", bundleKey);

NamespaceBundleStats stats = new NamespaceBundleStats();
stats.msgRateIn = 100000.0;
localData.getLastStats().put(bundleKey, stats);
pulsar1.getBrokerService().updateRates();

primaryLoadManager.updateAll();

primaryLoadManager.updateAll();
Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
Comment on lines +1173 to +1176
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name 'testRepeatSplitBundle' and the setup with updateAll() called twice suggests this test should verify that repeated splits don't cause errors. However, the test only asserts that the bundle data was removed (line 1176), but doesn't verify that no exceptions were thrown during the second updateAll() call. Consider adding assertions to verify that both updateAll() calls complete successfully without throwing the PreconditionFailedException mentioned in the PR description.

Copilot uses AI. Check for mistakes.
}

}
Loading