Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
/** The persistence provider for writing partial results */
protected final IPersistenceProvider persistenceProvider;

/** Throttler for progress updates. Owned and managed by this class. */
private final ProgressThrottler<Document> progressThrottler;

/**
* Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into
* a future object such that we can handle timeouts properly.
Expand All @@ -70,6 +73,10 @@ protected BulkScanWorker(
this.bulkScanId = bulkScanId;
this.scanConfig = scanConfig;
this.persistenceProvider = persistenceProvider;
this.progressThrottler =
new ProgressThrottler<>(
scanConfig.getPartialResultThrottleMs(),
"partial-result-throttle (bulk scan " + bulkScanId + ")");

timeoutExecutor =
new CanceallableThreadPoolExecutor(
Expand Down Expand Up @@ -105,7 +112,7 @@ public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
ProgressableFuture<Document> progressableFuture = new ProgressableFuture<>();

// Compose a consumer that both updates the future and persists partial results
Consumer<Document> progressConsumer =
Consumer<Document> rawConsumer =
partialResult -> {
progressableFuture.updateResult(partialResult);
try {
Expand All @@ -115,10 +122,24 @@ public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
}
};

// Wrap with throttling if partial results are enabled; otherwise pass a no-op
// consumer so subclasses can call accept() unconditionally
Consumer<Document> progressConsumer;
if (scanConfig.isPartialResultsEnabled()) {
progressConsumer =
partialResult ->
progressThrottler.submit(
partialResult, rawConsumer, System.currentTimeMillis());
} else {
progressConsumer = partialResult -> {};
}

timeoutExecutor.submit(
() -> {
try {
progressThrottler.reset();
Document result = scan(jobDescription, progressConsumer);
progressThrottler.flush();
progressableFuture.complete(result);
} catch (Exception e) {
progressableFuture.completeExceptionally(e);
Expand Down Expand Up @@ -155,6 +176,7 @@ public final boolean init() {
if (!initialized.get()) {
synchronized (initializationLock) {
if (!initialized.getAndSet(true)) {
progressThrottler.init();
initInternal();
return true;
}
Expand Down Expand Up @@ -183,6 +205,7 @@ public final boolean cleanup() {
}
if (initialized.getAndSet(false)) {
cleanupInternal();
progressThrottler.shutdown();
shouldCleanupSelf.set(false);
return true;
}
Expand Down
149 changes: 149 additions & 0 deletions src/main/java/de/rub/nds/crawler/core/ProgressThrottler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner
*
* Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH
*
* Licensed under Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0.txt
*/
package de.rub.nds.crawler.core;

import de.rub.nds.scanner.core.execution.NamedThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Generic throttler for progress updates. Ensures that updates are published at most once per
* throttle window. When an update arrives during the throttle period, it is saved as pending and
* scheduled to be published after the period ends. If another update arrives before the scheduled
* publish, it replaces the pending update, ensuring the latest state is always published.
*
* <p>Lifecycle: call {@link #init()} before submitting updates and {@link #shutdown()} when done.
*
* @param <T> the type of the progress payload
*/
public class ProgressThrottler<T> {

private static final Logger LOGGER = LogManager.getLogger();

/** Default throttle interval in milliseconds. */
public static final long DEFAULT_THROTTLE_MS = 5000;

private final long throttleMs;
private final String threadName;

private long lastUpdateTime = 0;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> pendingTask;
private T pendingPayload;
private Consumer<T> pendingConsumer;

/**
* Creates a new throttler.
*
* @param throttleMs minimum interval between published updates in milliseconds
* @param threadName name for the scheduler thread
*/
public ProgressThrottler(long throttleMs, String threadName) {
this.throttleMs = throttleMs;
this.threadName = threadName;
}

/** Initializes the internal scheduler. Must be called before {@link #submit}. */
public synchronized void init() {
if (scheduler != null) {
throw new IllegalStateException("ProgressThrottler already initialized");
}
scheduler = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(threadName));
}

/** Shuts down the scheduler and clears all pending state. */
public synchronized void shutdown() {
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}
pendingPayload = null;
pendingConsumer = null;
pendingTask = null;
}

/**
* Submits a progress update through the throttle mechanism. If enough time has elapsed since
* the last update, publishes immediately. Otherwise, saves as pending and schedules publication
* after the throttle period.
*
* @param payload the progress payload to publish
* @param consumer the consumer to publish to
* @param currentTime the current timestamp in milliseconds
*/
public synchronized void submit(T payload, Consumer<T> consumer, long currentTime) {
if (scheduler == null) {
throw new IllegalStateException(
"ProgressThrottler not initialized. Call init() first.");
}
if (currentTime - lastUpdateTime >= throttleMs) {
publish(payload, consumer, currentTime);
} else {
pendingPayload = payload;
pendingConsumer = consumer;

if (pendingTask != null && !pendingTask.isDone()) {
pendingTask.cancel(false);
}

long delay = throttleMs - (currentTime - lastUpdateTime);
pendingTask = scheduler.schedule(this::publishPending, delay, TimeUnit.MILLISECONDS);
LOGGER.debug("Progress update throttled, scheduled for {}ms", delay);
}
}

/**
* Flushes any pending throttled update immediately. Should be called after a scan completes to
* ensure the last update is not lost when the scheduler is shut down.
*/
public synchronized void flush() {
if (pendingPayload != null && pendingConsumer != null) {
publish(pendingPayload, pendingConsumer, System.currentTimeMillis());
}
if (pendingTask != null && !pendingTask.isDone()) {
pendingTask.cancel(false);
pendingTask = null;
}
}

/** Resets throttle state between scans. */
public synchronized void reset() {
lastUpdateTime = 0;
pendingPayload = null;
pendingConsumer = null;
if (pendingTask != null) {
pendingTask.cancel(false);
pendingTask = null;
}
}

/** Returns the configured throttle interval in milliseconds. */
public long getThrottleMs() {
return throttleMs;
}

private void publish(T payload, Consumer<T> consumer, long currentTime) {
consumer.accept(payload);
lastUpdateTime = currentTime;
pendingPayload = null;
pendingConsumer = null;
LOGGER.debug("Progress update published");
}

private synchronized void publishPending() {
if (pendingPayload != null && pendingConsumer != null) {
publish(pendingPayload, pendingConsumer, System.currentTimeMillis());
}
}
}
21 changes: 21 additions & 0 deletions src/main/java/de/rub/nds/crawler/data/ScanConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package de.rub.nds.crawler.data;

import de.rub.nds.crawler.core.BulkScanWorker;
import de.rub.nds.crawler.core.ProgressThrottler;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.scanner.core.config.ScannerDetail;
import de.rub.nds.scanner.core.probe.ProbeType;
Expand All @@ -29,6 +30,10 @@ public abstract class ScanConfig implements Serializable {

private List<ProbeType> excludedProbes;

private boolean partialResultsEnabled = true;

private long partialResultThrottleMs = ProgressThrottler.DEFAULT_THROTTLE_MS;

@SuppressWarnings("unused")
private ScanConfig() {}

Expand Down Expand Up @@ -116,6 +121,22 @@ public void setExcludedProbes(List<ProbeType> excludedProbes) {
this.excludedProbes = excludedProbes;
}

public boolean isPartialResultsEnabled() {
return partialResultsEnabled;
}

public void setPartialResultsEnabled(boolean partialResultsEnabled) {
this.partialResultsEnabled = partialResultsEnabled;
}

public long getPartialResultThrottleMs() {
return partialResultThrottleMs;
}

public void setPartialResultThrottleMs(long partialResultThrottleMs) {
this.partialResultThrottleMs = partialResultThrottleMs;
}

/**
* Creates a worker for this scan configuration. Each implementation must provide a factory
* method to create the appropriate worker type.
Expand Down
84 changes: 84 additions & 0 deletions src/test/java/de/rub/nds/crawler/core/ProgressThrottlerIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner
*
* Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH
*
* Licensed under Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0.txt
*/
package de.rub.nds.crawler.core;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

/**
* Integration tests that exercise the full throttler lifecycle: init, rapid updates, flush,
* shutdown.
*/
@Tag("IntegrationTest")
public class ProgressThrottlerIT {

@Test
public void testFullLifecycleFlushPublishesPendingBeforeShutdown() {
long highThrottleMs = 180_000;
ProgressThrottler<String> throttler =
new ProgressThrottler<>(highThrottleMs, "it-throttler");
throttler.init();

List<String> received = Collections.synchronizedList(new ArrayList<>());
// baseTime must be >= throttleMs so first update publishes immediately
long baseTime = 200_000L;

try {
int totalUpdates = 20;
for (int i = 1; i <= totalUpdates; i++) {
throttler.submit("update-" + i, received::add, baseTime + i);
}

assertEquals(1, received.size(), "Only the first update should publish immediately");
assertEquals("update-1", received.get(0));

throttler.flush();

assertEquals(2, received.size(), "Flush should have published the pending update");
assertEquals("update-20", received.get(1));

throttler.flush();
assertEquals(2, received.size(), "Second flush should be a no-op");
} finally {
throttler.shutdown();
}
}

@Test
public void testFlushBeforeShutdownPreventsDataloss() {
long throttleMs = 60_000;
ProgressThrottler<String> throttler = new ProgressThrottler<>(throttleMs, "it-throttler-2");
throttler.init();

List<String> received = Collections.synchronizedList(new ArrayList<>());
long baseTime = 100_000L;

try {
throttler.submit("first", received::add, baseTime);
assertEquals(1, received.size());

throttler.submit("middle", received::add, baseTime + 100);
throttler.submit("last", received::add, baseTime + 200);
assertEquals(1, received.size(), "Throttled updates should not publish yet");

throttler.flush();
assertEquals(2, received.size());
assertEquals("last", received.get(1));
} finally {
throttler.shutdown();
assertFalse(received.isEmpty());
}
}
}
Loading