diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index e436961b..7a801278 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -46,6 +46,9 @@ public abstract class BulkScanWorker { /** The persistence provider for writing partial results */ protected final IPersistenceProvider persistenceProvider; + /** Throttler for progress updates. Owned and managed by this class. */ + private final ProgressThrottler progressThrottler; + /** * Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into * a future object such that we can handle timeouts properly. @@ -70,6 +73,10 @@ protected BulkScanWorker( this.bulkScanId = bulkScanId; this.scanConfig = scanConfig; this.persistenceProvider = persistenceProvider; + this.progressThrottler = + new ProgressThrottler<>( + scanConfig.getPartialResultThrottleMs(), + "partial-result-throttle (bulk scan " + bulkScanId + ")"); timeoutExecutor = new CanceallableThreadPoolExecutor( @@ -105,7 +112,7 @@ public ProgressableFuture handle(ScanJobDescription jobDescription) { ProgressableFuture progressableFuture = new ProgressableFuture<>(); // Compose a consumer that both updates the future and persists partial results - Consumer progressConsumer = + Consumer rawConsumer = partialResult -> { progressableFuture.updateResult(partialResult); try { @@ -115,10 +122,24 @@ public ProgressableFuture handle(ScanJobDescription jobDescription) { } }; + // Wrap with throttling if partial results are enabled; otherwise pass a no-op + // consumer so subclasses can call accept() unconditionally + Consumer progressConsumer; + if (scanConfig.isPartialResultsEnabled()) { + progressConsumer = + partialResult -> + progressThrottler.submit( + partialResult, rawConsumer, System.currentTimeMillis()); + } else { + progressConsumer = partialResult -> {}; + } + timeoutExecutor.submit( () -> { try { + progressThrottler.reset(); Document result = scan(jobDescription, progressConsumer); + progressThrottler.flush(); progressableFuture.complete(result); } catch (Exception e) { progressableFuture.completeExceptionally(e); @@ -155,6 +176,7 @@ public final boolean init() { if (!initialized.get()) { synchronized (initializationLock) { if (!initialized.getAndSet(true)) { + progressThrottler.init(); initInternal(); return true; } @@ -183,6 +205,7 @@ public final boolean cleanup() { } if (initialized.getAndSet(false)) { cleanupInternal(); + progressThrottler.shutdown(); shouldCleanupSelf.set(false); return true; } diff --git a/src/main/java/de/rub/nds/crawler/core/ProgressThrottler.java b/src/main/java/de/rub/nds/crawler/core/ProgressThrottler.java new file mode 100644 index 00000000..28c77d68 --- /dev/null +++ b/src/main/java/de/rub/nds/crawler/core/ProgressThrottler.java @@ -0,0 +1,149 @@ +/* + * TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner + * + * Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH + * + * Licensed under Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0.txt + */ +package de.rub.nds.crawler.core; + +import de.rub.nds.scanner.core.execution.NamedThreadFactory; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Generic throttler for progress updates. Ensures that updates are published at most once per + * throttle window. When an update arrives during the throttle period, it is saved as pending and + * scheduled to be published after the period ends. If another update arrives before the scheduled + * publish, it replaces the pending update, ensuring the latest state is always published. + * + *

Lifecycle: call {@link #init()} before submitting updates and {@link #shutdown()} when done. + * + * @param the type of the progress payload + */ +public class ProgressThrottler { + + private static final Logger LOGGER = LogManager.getLogger(); + + /** Default throttle interval in milliseconds. */ + public static final long DEFAULT_THROTTLE_MS = 5000; + + private final long throttleMs; + private final String threadName; + + private long lastUpdateTime = 0; + private ScheduledExecutorService scheduler; + private ScheduledFuture pendingTask; + private T pendingPayload; + private Consumer pendingConsumer; + + /** + * Creates a new throttler. + * + * @param throttleMs minimum interval between published updates in milliseconds + * @param threadName name for the scheduler thread + */ + public ProgressThrottler(long throttleMs, String threadName) { + this.throttleMs = throttleMs; + this.threadName = threadName; + } + + /** Initializes the internal scheduler. Must be called before {@link #submit}. */ + public synchronized void init() { + if (scheduler != null) { + throw new IllegalStateException("ProgressThrottler already initialized"); + } + scheduler = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(threadName)); + } + + /** Shuts down the scheduler and clears all pending state. */ + public synchronized void shutdown() { + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } + pendingPayload = null; + pendingConsumer = null; + pendingTask = null; + } + + /** + * Submits a progress update through the throttle mechanism. If enough time has elapsed since + * the last update, publishes immediately. Otherwise, saves as pending and schedules publication + * after the throttle period. + * + * @param payload the progress payload to publish + * @param consumer the consumer to publish to + * @param currentTime the current timestamp in milliseconds + */ + public synchronized void submit(T payload, Consumer consumer, long currentTime) { + if (scheduler == null) { + throw new IllegalStateException( + "ProgressThrottler not initialized. Call init() first."); + } + if (currentTime - lastUpdateTime >= throttleMs) { + publish(payload, consumer, currentTime); + } else { + pendingPayload = payload; + pendingConsumer = consumer; + + if (pendingTask != null && !pendingTask.isDone()) { + pendingTask.cancel(false); + } + + long delay = throttleMs - (currentTime - lastUpdateTime); + pendingTask = scheduler.schedule(this::publishPending, delay, TimeUnit.MILLISECONDS); + LOGGER.debug("Progress update throttled, scheduled for {}ms", delay); + } + } + + /** + * Flushes any pending throttled update immediately. Should be called after a scan completes to + * ensure the last update is not lost when the scheduler is shut down. + */ + public synchronized void flush() { + if (pendingPayload != null && pendingConsumer != null) { + publish(pendingPayload, pendingConsumer, System.currentTimeMillis()); + } + if (pendingTask != null && !pendingTask.isDone()) { + pendingTask.cancel(false); + pendingTask = null; + } + } + + /** Resets throttle state between scans. */ + public synchronized void reset() { + lastUpdateTime = 0; + pendingPayload = null; + pendingConsumer = null; + if (pendingTask != null) { + pendingTask.cancel(false); + pendingTask = null; + } + } + + /** Returns the configured throttle interval in milliseconds. */ + public long getThrottleMs() { + return throttleMs; + } + + private void publish(T payload, Consumer consumer, long currentTime) { + consumer.accept(payload); + lastUpdateTime = currentTime; + pendingPayload = null; + pendingConsumer = null; + LOGGER.debug("Progress update published"); + } + + private synchronized void publishPending() { + if (pendingPayload != null && pendingConsumer != null) { + publish(pendingPayload, pendingConsumer, System.currentTimeMillis()); + } + } +} diff --git a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java index 3cc5a87d..d98d246a 100644 --- a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java +++ b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java @@ -9,6 +9,7 @@ package de.rub.nds.crawler.data; import de.rub.nds.crawler.core.BulkScanWorker; +import de.rub.nds.crawler.core.ProgressThrottler; import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import de.rub.nds.scanner.core.probe.ProbeType; @@ -29,6 +30,10 @@ public abstract class ScanConfig implements Serializable { private List excludedProbes; + private boolean partialResultsEnabled = true; + + private long partialResultThrottleMs = ProgressThrottler.DEFAULT_THROTTLE_MS; + @SuppressWarnings("unused") private ScanConfig() {} @@ -116,6 +121,22 @@ public void setExcludedProbes(List excludedProbes) { this.excludedProbes = excludedProbes; } + public boolean isPartialResultsEnabled() { + return partialResultsEnabled; + } + + public void setPartialResultsEnabled(boolean partialResultsEnabled) { + this.partialResultsEnabled = partialResultsEnabled; + } + + public long getPartialResultThrottleMs() { + return partialResultThrottleMs; + } + + public void setPartialResultThrottleMs(long partialResultThrottleMs) { + this.partialResultThrottleMs = partialResultThrottleMs; + } + /** * Creates a worker for this scan configuration. Each implementation must provide a factory * method to create the appropriate worker type. diff --git a/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerIT.java b/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerIT.java new file mode 100644 index 00000000..520e5f40 --- /dev/null +++ b/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerIT.java @@ -0,0 +1,84 @@ +/* + * TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner + * + * Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH + * + * Licensed under Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0.txt + */ +package de.rub.nds.crawler.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +/** + * Integration tests that exercise the full throttler lifecycle: init, rapid updates, flush, + * shutdown. + */ +@Tag("IntegrationTest") +public class ProgressThrottlerIT { + + @Test + public void testFullLifecycleFlushPublishesPendingBeforeShutdown() { + long highThrottleMs = 180_000; + ProgressThrottler throttler = + new ProgressThrottler<>(highThrottleMs, "it-throttler"); + throttler.init(); + + List received = Collections.synchronizedList(new ArrayList<>()); + // baseTime must be >= throttleMs so first update publishes immediately + long baseTime = 200_000L; + + try { + int totalUpdates = 20; + for (int i = 1; i <= totalUpdates; i++) { + throttler.submit("update-" + i, received::add, baseTime + i); + } + + assertEquals(1, received.size(), "Only the first update should publish immediately"); + assertEquals("update-1", received.get(0)); + + throttler.flush(); + + assertEquals(2, received.size(), "Flush should have published the pending update"); + assertEquals("update-20", received.get(1)); + + throttler.flush(); + assertEquals(2, received.size(), "Second flush should be a no-op"); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testFlushBeforeShutdownPreventsDataloss() { + long throttleMs = 60_000; + ProgressThrottler throttler = new ProgressThrottler<>(throttleMs, "it-throttler-2"); + throttler.init(); + + List received = Collections.synchronizedList(new ArrayList<>()); + long baseTime = 100_000L; + + try { + throttler.submit("first", received::add, baseTime); + assertEquals(1, received.size()); + + throttler.submit("middle", received::add, baseTime + 100); + throttler.submit("last", received::add, baseTime + 200); + assertEquals(1, received.size(), "Throttled updates should not publish yet"); + + throttler.flush(); + assertEquals(2, received.size()); + assertEquals("last", received.get(1)); + } finally { + throttler.shutdown(); + assertFalse(received.isEmpty()); + } + } +} diff --git a/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerTest.java b/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerTest.java new file mode 100644 index 00000000..28e472cc --- /dev/null +++ b/src/test/java/de/rub/nds/crawler/core/ProgressThrottlerTest.java @@ -0,0 +1,219 @@ +/* + * TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner + * + * Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH + * + * Licensed under Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0.txt + */ +package de.rub.nds.crawler.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; + +public class ProgressThrottlerTest { + + private static final long THROTTLE_MS = 5000; + + private static ProgressThrottler createThrottler() { + ProgressThrottler throttler = + new ProgressThrottler<>(THROTTLE_MS, "test-throttler"); + throttler.init(); + return throttler; + } + + @Test + public void testFirstUpdatePublishesImmediately() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + throttler.submit("update-1", received::add, THROTTLE_MS); + + assertEquals(1, received.size()); + assertEquals("update-1", received.get(0)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testUpdateWithinThrottleWindowIsDeferred() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + throttler.submit("update-2", received::add, baseTime + 1000); + assertEquals(1, received.size()); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testUpdateAfterThrottleWindowPublishesImmediately() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + long afterThrottle = baseTime + THROTTLE_MS; + throttler.submit("update-2", received::add, afterThrottle); + assertEquals(2, received.size()); + assertEquals("update-2", received.get(1)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testScheduledPublishFiresAfterDelay() throws Exception { + ProgressThrottler throttler = createThrottler(); + try { + CountDownLatch latch = new CountDownLatch(1); + List received = Collections.synchronizedList(new ArrayList<>()); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + Consumer latchConsumer = + s -> { + received.add(s); + latch.countDown(); + }; + long nearEndOfWindow = baseTime + THROTTLE_MS - 1; + throttler.submit("update-2", latchConsumer, nearEndOfWindow); + assertEquals(1, received.size()); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + assertEquals(2, received.size()); + assertEquals("update-2", received.get(1)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testNewerPendingUpdateReplacesPrevious() throws Exception { + ProgressThrottler throttler = createThrottler(); + try { + CountDownLatch latch = new CountDownLatch(1); + List received = Collections.synchronizedList(new ArrayList<>()); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + Consumer latchConsumer = + s -> { + received.add(s); + latch.countDown(); + }; + + throttler.submit("update-2", latchConsumer, baseTime + 2000); + long nearEnd = baseTime + THROTTLE_MS - 1; + throttler.submit("update-3", latchConsumer, nearEnd); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + + assertEquals(2, received.size()); + assertEquals("update-1", received.get(0)); + assertEquals("update-3", received.get(1)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testFlushPublishesPendingUpdate() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + throttler.submit("update-2", received::add, baseTime + 1000); + assertEquals(1, received.size()); + + throttler.flush(); + assertEquals(2, received.size()); + assertEquals("update-2", received.get(1)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testFlushWithNoPendingUpdateIsNoOp() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + throttler.flush(); + assertEquals(1, received.size()); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testResetAllowsImmediatePublishAgain() { + ProgressThrottler throttler = createThrottler(); + try { + List received = new ArrayList<>(); + long baseTime = 10000L; + + throttler.submit("update-1", received::add, baseTime); + assertEquals(1, received.size()); + + throttler.submit("update-2", received::add, baseTime + 100); + assertEquals(1, received.size()); + + throttler.reset(); + + throttler.submit("update-3", received::add, baseTime + 200); + assertEquals(2, received.size()); + assertEquals("update-3", received.get(1)); + } finally { + throttler.shutdown(); + } + } + + @Test + public void testSubmitWithoutInitThrows() { + ProgressThrottler throttler = new ProgressThrottler<>(THROTTLE_MS, "test"); + assertThrows(IllegalStateException.class, () -> throttler.submit("x", s -> {}, 10000L)); + } + + @Test + public void testDoubleInitThrows() { + ProgressThrottler throttler = createThrottler(); + try { + assertThrows(IllegalStateException.class, throttler::init); + } finally { + throttler.shutdown(); + } + } +}