From 39f0c331a37f0cb815778b4d4ba809dcaa8ef5eb Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Mon, 13 Apr 2026 02:50:24 +0200 Subject: [PATCH 1/4] Add new `load-test` command This new command launches queries in parallel against the specified endpoint, and shows a statistics about the response times, success rates, and so on. There are options `--request-rate` (how many queries per second to send), `--duration` (how long to run the test), and `--log-frequency` (with what frequency to print the statistics). Queries can be read from a TSV or YAML file (same format as for the `benchmark-queries` command), or from a directory that contains one JSON file per query (format used for our wikidata-query-log dataset). The option `--query-selection` determines how queries are selected (`random`, which is the default, or `cyclic`). --- src/qlever/commands/load_test.py | 461 +++++++++++++++++++++++++++++++ 1 file changed, 461 insertions(+) create mode 100644 src/qlever/commands/load_test.py diff --git a/src/qlever/commands/load_test.py b/src/qlever/commands/load_test.py new file mode 100644 index 00000000..d8823e99 --- /dev/null +++ b/src/qlever/commands/load_test.py @@ -0,0 +1,461 @@ +from __future__ import annotations + +import json +import os +import random +import re +import shlex +import statistics +import subprocess +import threading +import time + +import yaml + +from qlever.command import QleverCommand +from qlever.log import log + + +def parse_duration(s: str) -> float: + """ + Parse a duration string like "10s", "34min", "2h", "1.5h", "90" (seconds + by default) and return the duration in seconds. + """ + s = s.strip() + match = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(s|sec|min|m|h|hr)?", s) + if not match: + raise ValueError(f"Cannot parse duration: \"{s}\" (examples: " + f"\"10s\", \"34min\", \"2h\", \"90\")") + value = float(match.group(1)) + unit = match.group(2) or "s" + if unit in ("s", "sec"): + return value + elif unit in ("min", "m"): + return value * 60 + elif unit in ("h", "hr"): + return value * 3600 + else: + raise ValueError(f"Unknown duration unit: \"{unit}\"") + + +def format_duration(seconds: float) -> str: + """ + Format a duration in seconds as a human-readable string like "1h 23min" + or "45s" or "2min 30s". + """ + if seconds >= 3600: + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + return f"{h}h {m}min" if m > 0 else f"{h}h" + elif seconds >= 60: + m = int(seconds // 60) + s = int(seconds % 60) + return f"{m}min {s}s" if s > 0 else f"{m}min" + else: + return f"{seconds:.0f}s" + + +class QuerySource: + """ + Unified query source that supports three formats: TSV, YAML, and a + directory of JSON files. For TSV and YAML, all queries are read into + memory upfront. For a directory, only the file names are read upfront + and individual queries are read on demand. + """ + + def __init__(self, queries: list[str] | None = None, + query_dir: str | None = None, + query_files: list[str] | None = None): + self._queries = queries + self._query_dir = query_dir + self._query_files = query_files + self._cycle_index = 0 + + @staticmethod + def from_tsv(path: str) -> QuerySource: + queries = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t", 1) + queries.append(parts[1].strip() if len(parts) == 2 + else parts[0].strip()) + return QuerySource(queries=queries) + + @staticmethod + def from_yml(path: str) -> QuerySource: + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + if not isinstance(data, dict) or "queries" not in data: + raise ValueError("YAML file must contain a top-level" + " 'queries' key") + if not isinstance(data["queries"], list): + raise ValueError("'queries' key must hold a list") + queries = [] + for entry in data["queries"]: + if not isinstance(entry, dict) or "query" not in entry: + raise ValueError("Each query entry must contain" + " a 'query' key") + queries.append(entry["query"].strip()) + return QuerySource(queries=queries) + + @staticmethod + def from_dir(path: str) -> QuerySource: + query_files = sorted(f for f in os.listdir(path) + if f.endswith(".json")) + return QuerySource(query_dir=path, query_files=query_files) + + def __len__(self): + if self._queries is not None: + return len(self._queries) + return len(self._query_files) + + @property + def description(self): + if self._query_dir is not None: + return "JSON files" + return "queries" + + def get_query(self, selection: str) -> str: + """ + Pick the next query according to the selection strategy ("random" + or "cycle") and return the SPARQL query string. + """ + n = len(self) + if selection == "random": + idx = random.randint(0, n - 1) + else: + idx = self._cycle_index % n + self._cycle_index += 1 + + if self._queries is not None: + return self._queries[idx] + + # Directory mode: read the JSON file on demand. If the file does + # not contain a valid query, try the next one. + for attempt in range(min(10, n)): + actual_idx = (idx + attempt) % n + path = os.path.join(self._query_dir, + self._query_files[actual_idx]) + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + output = data.get("output") + if not output: + continue + query = output.get("sparql_fixed") or output.get("sparql") + if query: + return query + except (KeyError, TypeError, json.JSONDecodeError, AttributeError): + continue + raise ValueError("Could not find a valid query in the directory") + + +class LoadTestCommand(QleverCommand): + """ + Class for executing the `load-test` command. + """ + + def __init__(self): + pass + + def description(self) -> str: + return ("Send many concurrent queries to a SPARQL endpoint") + + def should_have_qleverfile(self) -> bool: + return False + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return {"server": ["host_name", "port"]} + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "--queries-tsv", + type=str, + default=None, + help="Path to a TSV file with queries" + " (short_query_namesparql_query)", + ) + subparser.add_argument( + "--queries-yml", + type=str, + default=None, + help="Path to a YAML file with queries" + " (same format as for benchmark-queries)", + ) + subparser.add_argument( + "--queries-dir", + type=str, + default=None, + help="Path to a directory with one JSON file per query" + " (query in output.sparql)", + ) + subparser.add_argument( + "--query-selection", + type=str, + choices=["random", "cycle"], + default="random", + help="How to pick the next query (default: random)", + ) + subparser.add_argument( + "--request-rate", + type=float, + default=10.0, + help="Number of queries per second (default: 10)", + ) + subparser.add_argument( + "--duration", + type=str, + default="30s", + help="How long to run (e.g., 10s, 5min, 2h; default: 30s)", + ) + subparser.add_argument( + "--log-frequency", + type=str, + default="5s", + help="How often to print a status line (default: 5s)", + ) + subparser.add_argument( + "--sparql-endpoint", + type=str, + help="URL of the SPARQL endpoint", + ) + subparser.add_argument( + "--show-queries", + type=int, + nargs="?", + const=10, + default=None, + metavar="N", + help="Show the first N queries (default: 10) and exit" + " without running the load test", + ) + + def execute(self, args) -> bool: + # Parse duration and log frequency. + try: + duration_secs = parse_duration(args.duration) + log_frequency_secs = parse_duration(args.log_frequency) + except ValueError as e: + log.error(e) + return False + + # Exactly one of the three query sources must be specified. + sources = [args.queries_tsv, args.queries_yml, args.queries_dir] + if sum(s is not None for s in sources) != 1: + log.error("Please specify exactly one of --queries-tsv," + " --queries-yml, or --queries-dir") + return False + + # Build the query source. + try: + if args.queries_tsv: + source = QuerySource.from_tsv(args.queries_tsv) + source_path = args.queries_tsv + elif args.queries_yml: + source = QuerySource.from_yml(args.queries_yml) + source_path = args.queries_yml + else: + source = QuerySource.from_dir(args.queries_dir) + source_path = args.queries_dir + except Exception as e: + log.error(f"Could not read queries: {e}") + return False + if len(source) == 0: + log.error(f"No queries found in \"{source_path}\"") + return False + + # If --show-queries is given, show queries and exit. + if args.show_queries is not None: + n = min(args.show_queries, len(source)) + log.info(f"Showing {n} of {len(source)}" + f" {source.description} from \"{source_path}\":") + log.info("") + for i in range(n): + query = source.get_query(args.query_selection) + log.info(f"Query {i + 1}: {query[:200]}" + f"{'...' if len(query) > 200 else ''}") + return True + + # Determine the SPARQL endpoint. + sparql_endpoint = ( + args.sparql_endpoint + if args.sparql_endpoint + else f"{args.host_name}:{args.port}" + ) + + # Show what the command will do. + self.show( + f"Send queries from \"{source_path}\"" + f" ({len(source)} {source.description}," + f" {args.query_selection} selection)" + f" to {sparql_endpoint} at {args.request_rate} queries/s" + f" for {format_duration(duration_secs)}" + f" (log every {format_duration(log_frequency_secs)})", + only_show=args.show, + ) + if args.show: + return True + + # Shared state for the worker threads. + lock = threading.Lock() + num_launched = 0 + num_done = 0 + num_errors = 0 + completed_times: list[float] = [] + status_codes: dict[str, int] = {} + + def send_query(query: str): + nonlocal num_done, num_errors + curl_cmd = ( + f"curl -s {sparql_endpoint}" + f" -H \"Accept: application/qlever-results+json\"" + f" --data-urlencode query={shlex.quote(query)}" + f" -o /dev/null -w \"%{{http_code}}\"" + ) + start = time.time() + try: + result = subprocess.run( + curl_cmd, shell=True, capture_output=True, + text=True, timeout=max(duration_secs * 2, 300), + ) + elapsed = time.time() - start + http_code = result.stdout.strip() + with lock: + num_done += 1 + completed_times.append(elapsed) + status_codes[http_code] = \ + status_codes.get(http_code, 0) + 1 + if http_code != "200": + num_errors += 1 + except Exception: + elapsed = time.time() - start + with lock: + num_done += 1 + num_errors += 1 + completed_times.append(elapsed) + status_codes["timeout"] = \ + status_codes.get("timeout", 0) + 1 + + # Print status header and status lines. + header = ( + f"{'Elapsed':>8s} {'Launched':>8s} {'Done':>8s}" + f" {'Running':>8s} {'Errors':>8s}" + f" {'Median':>8s} {'Mean':>8s}" + f" {'95p':>8s} {'Max':>8s}" + ) + separator = " ".join(["--------"] * 9) + + def print_table_header(): + log.info(header) + log.info(separator) + + log.info("") + print_table_header() + + def print_status(elapsed: float): + with lock: + n_launched = num_launched + n_done = num_done + n_errors = num_errors + times = list(completed_times) + n_running = n_launched - n_done + if times: + sorted_times = sorted(times) + median = statistics.median(sorted_times) + mean = statistics.mean(sorted_times) + p95_idx = int(len(sorted_times) * 0.95) + p95 = sorted_times[min(p95_idx, len(sorted_times) - 1)] + max_t = sorted_times[-1] + time_stats = ( + f" {median:>7.2f}s {mean:>7.2f}s" + f" {p95:>7.2f}s {max_t:>7.2f}s" + ) + else: + time_stats = ( + f" {'':>8s} {'':>8s}" + f" {'':>8s} {'':>8s}" + ) + log.info( + f"{format_duration(elapsed):>8s}" + f" {n_launched:>8d} {n_done:>8d}" + f" {n_running:>8d} {n_errors:>8d}" + f"{time_stats}" + ) + + # Main loop: launch queries at the specified rate. + try: + start_time = time.time() + next_log_time = start_time + log_frequency_secs + interval = 1.0 / args.request_rate + next_launch_time = start_time + interval + + while True: + now = time.time() + elapsed = now - start_time + if elapsed >= duration_secs: + break + + # Launch queries that are due. + while next_launch_time <= now: + query = source.get_query(args.query_selection) + thread = threading.Thread( + target=send_query, args=(query,), daemon=True, + ) + thread.start() + with lock: + num_launched += 1 + next_launch_time += interval + + # Print status line if due. + if now >= next_log_time: + print_status(elapsed) + next_log_time += log_frequency_secs + + # Sleep briefly to avoid busy waiting. + sleep_until = min(next_launch_time, next_log_time, + start_time + duration_secs) + sleep_time = sleep_until - time.time() + if sleep_time > 0: + time.sleep(min(sleep_time, 0.1)) + + interrupted = False + except KeyboardInterrupt: + interrupted = True + log.warning("\rCtrl+C pressed, stopping load test") + + # Helper to print header + status (used after interruption + # and after waiting for stragglers). + def print_final_status(): + print_table_header() + print_status(time.time() - start_time) + + # Print final status line. After Ctrl+C, reprint the header + # since the warning message breaks the table flow. After normal + # completion, just print a separator + status line. + if interrupted: + print_final_status() + else: + log.info(separator) + print_status(time.time() - start_time) + + # Show HTTP status code histogram. + http_status_names = { + "200": "OK", "400": "Bad Request", + "429": "Too Many Requests", + "500": "Internal Server Error", "502": "Bad Gateway", + } + with lock: + codes = dict(status_codes) + if codes: + parts = [] + for code, count in sorted(codes.items()): + name = http_status_names.get(code) + label = f"{code} ({name})" if name else code + parts.append(f"{count} x {label}") + log.info("") + log.info(f"HTTP status codes: {', '.join(parts)}") + + return True From 4d856fdcd051de2cbdf490cd779dbd4720ea0dd3 Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Sun, 19 Apr 2026 15:56:46 +0200 Subject: [PATCH 2/4] Major revision of `load-test` command `--queries-dir` replaced by `--queries-jsonl` -> reads a single JSONL file instead of a directory of JSON files; all three formats (TSV, YAML, JSONL) now fully read queries into memory upfront `--duration` replaced by `--num-queries` (default 1000) -> specify how many queries to send, not how long to run `--query-selection` added -> `random` (default) or `cycle` `--show-queries` added -> print full queries in purple and exit `--show-error-messages` added -> show query and error message for failed queries Refactor code and various minor improvements --- src/qlever/commands/load_test.py | 298 ++++++++++++++----------------- 1 file changed, 138 insertions(+), 160 deletions(-) diff --git a/src/qlever/commands/load_test.py b/src/qlever/commands/load_test.py index d8823e99..ca553c54 100644 --- a/src/qlever/commands/load_test.py +++ b/src/qlever/commands/load_test.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import os import random import re import shlex @@ -12,6 +11,8 @@ import yaml +from termcolor import colored + from qlever.command import QleverCommand from qlever.log import log @@ -38,119 +39,53 @@ def parse_duration(s: str) -> float: raise ValueError(f"Unknown duration unit: \"{unit}\"") -def format_duration(seconds: float) -> str: - """ - Format a duration in seconds as a human-readable string like "1h 23min" - or "45s" or "2min 30s". - """ - if seconds >= 3600: - h = int(seconds // 3600) - m = int((seconds % 3600) // 60) - return f"{h}h {m}min" if m > 0 else f"{h}h" - elif seconds >= 60: - m = int(seconds // 60) - s = int(seconds % 60) - return f"{m}min {s}s" if s > 0 else f"{m}min" - else: - return f"{seconds:.0f}s" - - -class QuerySource: - """ - Unified query source that supports three formats: TSV, YAML, and a - directory of JSON files. For TSV and YAML, all queries are read into - memory upfront. For a directory, only the file names are read upfront - and individual queries are read on demand. - """ - - def __init__(self, queries: list[str] | None = None, - query_dir: str | None = None, - query_files: list[str] | None = None): - self._queries = queries - self._query_dir = query_dir - self._query_files = query_files - self._cycle_index = 0 - - @staticmethod - def from_tsv(path: str) -> QuerySource: - queries = [] - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - parts = line.split("\t", 1) - queries.append(parts[1].strip() if len(parts) == 2 - else parts[0].strip()) - return QuerySource(queries=queries) - - @staticmethod - def from_yml(path: str) -> QuerySource: - with open(path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - if not isinstance(data, dict) or "queries" not in data: - raise ValueError("YAML file must contain a top-level" - " 'queries' key") - if not isinstance(data["queries"], list): - raise ValueError("'queries' key must hold a list") - queries = [] - for entry in data["queries"]: - if not isinstance(entry, dict) or "query" not in entry: - raise ValueError("Each query entry must contain" - " a 'query' key") - queries.append(entry["query"].strip()) - return QuerySource(queries=queries) - - @staticmethod - def from_dir(path: str) -> QuerySource: - query_files = sorted(f for f in os.listdir(path) - if f.endswith(".json")) - return QuerySource(query_dir=path, query_files=query_files) - - def __len__(self): - if self._queries is not None: - return len(self._queries) - return len(self._query_files) - - @property - def description(self): - if self._query_dir is not None: - return "JSON files" - return "queries" - - def get_query(self, selection: str) -> str: - """ - Pick the next query according to the selection strategy ("random" - or "cycle") and return the SPARQL query string. - """ - n = len(self) - if selection == "random": - idx = random.randint(0, n - 1) - else: - idx = self._cycle_index % n - self._cycle_index += 1 - - if self._queries is not None: - return self._queries[idx] - - # Directory mode: read the JSON file on demand. If the file does - # not contain a valid query, try the next one. - for attempt in range(min(10, n)): - actual_idx = (idx + attempt) % n - path = os.path.join(self._query_dir, - self._query_files[actual_idx]) +def read_queries_tsv(path: str) -> list[str]: + """Read queries from a TSV file (namequery per line).""" + queries = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t", 1) + queries.append(parts[1].strip() if len(parts) == 2 + else parts[0].strip()) + return queries + + +def read_queries_yml(path: str) -> list[str]: + """Read queries from a YAML file (benchmark-queries format).""" + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + if not isinstance(data, dict) or "queries" not in data: + raise ValueError("YAML file must contain a top-level 'queries' key") + if not isinstance(data["queries"], list): + raise ValueError("'queries' key must hold a list") + queries = [] + for entry in data["queries"]: + if not isinstance(entry, dict) or "query" not in entry: + raise ValueError("Each query entry must contain a 'query' key") + queries.append(entry["query"].strip()) + return queries + + +def read_queries_jsonl(path: str) -> list[str]: + """Read queries from a JSONL file (one JSON object per line with a + 'sparql' field).""" + queries = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) - output = data.get("output") - if not output: - continue - query = output.get("sparql_fixed") or output.get("sparql") + data = json.loads(line) + query = data.get("sparql") if query: - return query - except (KeyError, TypeError, json.JSONDecodeError, AttributeError): + queries.append(query.strip()) + except (json.JSONDecodeError, AttributeError): continue - raise ValueError("Could not find a valid query in the directory") + return queries class LoadTestCommand(QleverCommand): @@ -186,11 +121,11 @@ def additional_arguments(self, subparser) -> None: " (same format as for benchmark-queries)", ) subparser.add_argument( - "--queries-dir", + "--queries-jsonl", type=str, default=None, - help="Path to a directory with one JSON file per query" - " (query in output.sparql)", + help="Path to a JSONL file with queries" + " (one JSON object per line with a 'sparql' field)", ) subparser.add_argument( "--query-selection", @@ -206,10 +141,10 @@ def additional_arguments(self, subparser) -> None: help="Number of queries per second (default: 10)", ) subparser.add_argument( - "--duration", - type=str, - default="30s", - help="How long to run (e.g., 10s, 5min, 2h; default: 30s)", + "--num-queries", + type=int, + default=1000, + help="Total number of queries to send (default: 1000)", ) subparser.add_argument( "--log-frequency", @@ -222,6 +157,18 @@ def additional_arguments(self, subparser) -> None: type=str, help="URL of the SPARQL endpoint", ) + subparser.add_argument( + "--show-error-messages", + action="store_true", + default=False, + help="Show query and error message for failed queries", + ) + subparser.add_argument( + "--width-error-message", + type=int, + default=150, + help="Truncate error messages to this width (default: 150)", + ) subparser.add_argument( "--show-queries", type=int, @@ -234,49 +181,64 @@ def additional_arguments(self, subparser) -> None: ) def execute(self, args) -> bool: - # Parse duration and log frequency. + # Parse log frequency. try: - duration_secs = parse_duration(args.duration) log_frequency_secs = parse_duration(args.log_frequency) except ValueError as e: log.error(e) return False # Exactly one of the three query sources must be specified. - sources = [args.queries_tsv, args.queries_yml, args.queries_dir] - if sum(s is not None for s in sources) != 1: + sources = { + "--queries-tsv": args.queries_tsv, + "--queries-yml": args.queries_yml, + "--queries-jsonl": args.queries_jsonl, + } + specified = {k: v for k, v in sources.items() if v is not None} + if len(specified) != 1: log.error("Please specify exactly one of --queries-tsv," - " --queries-yml, or --queries-dir") + " --queries-yml, or --queries-jsonl") return False - # Build the query source. + # Read all queries into memory. + source_option, source_path = next(iter(specified.items())) try: - if args.queries_tsv: - source = QuerySource.from_tsv(args.queries_tsv) - source_path = args.queries_tsv - elif args.queries_yml: - source = QuerySource.from_yml(args.queries_yml) - source_path = args.queries_yml + if source_option == "--queries-tsv": + queries = read_queries_tsv(source_path) + elif source_option == "--queries-yml": + queries = read_queries_yml(source_path) else: - source = QuerySource.from_dir(args.queries_dir) - source_path = args.queries_dir + queries = read_queries_jsonl(source_path) except Exception as e: log.error(f"Could not read queries: {e}") return False - if len(source) == 0: + if not queries: log.error(f"No queries found in \"{source_path}\"") return False + # Query selection helper. + cycle_index = 0 + + def next_query() -> str: + nonlocal cycle_index + if args.query_selection == "random": + return random.choice(queries) + else: + query = queries[cycle_index % len(queries)] + cycle_index += 1 + return query + # If --show-queries is given, show queries and exit. if args.show_queries is not None: - n = min(args.show_queries, len(source)) - log.info(f"Showing {n} of {len(source)}" - f" {source.description} from \"{source_path}\":") + n = min(args.show_queries, len(queries)) + log.info(f"Showing {n} of {len(queries)} queries" + f" from \"{source_path}\":") log.info("") for i in range(n): - query = source.get_query(args.query_selection) - log.info(f"Query {i + 1}: {query[:200]}" - f"{'...' if len(query) > 200 else ''}") + query = next_query() + log.info(f"Query {i + 1}:") + log.info(colored(query, "magenta")) + log.info("") return True # Determine the SPARQL endpoint. @@ -288,12 +250,10 @@ def execute(self, args) -> bool: # Show what the command will do. self.show( - f"Send queries from \"{source_path}\"" - f" ({len(source)} {source.description}," - f" {args.query_selection} selection)" + f"Send {args.num_queries} queries from \"{source_path}\"" + f" ({len(queries)} queries, {args.query_selection} selection)" f" to {sparql_endpoint} at {args.request_rate} queries/s" - f" for {format_duration(duration_secs)}" - f" (log every {format_duration(log_frequency_secs)})", + f" (log every {log_frequency_secs:.0f}s)", only_show=args.show, ) if args.show: @@ -307,19 +267,23 @@ def execute(self, args) -> bool: completed_times: list[float] = [] status_codes: dict[str, int] = {} + show_errors = args.show_error_messages + max_error_width = args.width_error_message + def send_query(query: str): nonlocal num_done, num_errors + result_file = f"/tmp/qlever.load-test.{threading.get_ident()}" curl_cmd = ( f"curl -s {sparql_endpoint}" f" -H \"Accept: application/qlever-results+json\"" f" --data-urlencode query={shlex.quote(query)}" - f" -o /dev/null -w \"%{{http_code}}\"" + f" -o {result_file} -w \"%{{http_code}}\"" ) start = time.time() try: result = subprocess.run( curl_cmd, shell=True, capture_output=True, - text=True, timeout=max(duration_secs * 2, 300), + text=True, timeout=300, ) elapsed = time.time() - start http_code = result.stdout.strip() @@ -330,6 +294,19 @@ def send_query(query: str): status_codes.get(http_code, 0) + 1 if http_code != "200": num_errors += 1 + if show_errors: + try: + with open(result_file) as f: + body = f.read().strip() + msg = re.sub(r"\s+", " ", body) + if len(msg) > max_error_width: + msg = msg[:max_error_width] + "..." + except Exception: + msg = "(could not read error response)" + log.info("") + log.info(colored(re.sub(r"\s+", " ", query), + "magenta")) + log.error(f"HTTP {http_code}: {msg}") except Exception: elapsed = time.time() - start with lock: @@ -338,6 +315,11 @@ def send_query(query: str): completed_times.append(elapsed) status_codes["timeout"] = \ status_codes.get("timeout", 0) + 1 + if show_errors: + log.info("") + log.info(colored(re.sub(r"\s+", " ", query), + "magenta")) + log.error("Request timed out or failed") # Print status header and status lines. header = ( @@ -379,12 +361,16 @@ def print_status(elapsed: float): f" {'':>8s} {'':>8s}" ) log.info( - f"{format_duration(elapsed):>8s}" + f"{elapsed:>7.0f}s" f" {n_launched:>8d} {n_done:>8d}" f" {n_running:>8d} {n_errors:>8d}" f"{time_stats}" ) + def print_final_status(): + print_table_header() + print_status(time.time() - start_time) + # Main loop: launch queries at the specified rate. try: start_time = time.time() @@ -392,15 +378,14 @@ def print_status(elapsed: float): interval = 1.0 / args.request_rate next_launch_time = start_time + interval - while True: + while num_launched < args.num_queries: now = time.time() elapsed = now - start_time - if elapsed >= duration_secs: - break # Launch queries that are due. - while next_launch_time <= now: - query = source.get_query(args.query_selection) + while next_launch_time <= now \ + and num_launched < args.num_queries: + query = next_query() thread = threading.Thread( target=send_query, args=(query,), daemon=True, ) @@ -415,8 +400,7 @@ def print_status(elapsed: float): next_log_time += log_frequency_secs # Sleep briefly to avoid busy waiting. - sleep_until = min(next_launch_time, next_log_time, - start_time + duration_secs) + sleep_until = min(next_launch_time, next_log_time) sleep_time = sleep_until - time.time() if sleep_time > 0: time.sleep(min(sleep_time, 0.1)) @@ -426,12 +410,6 @@ def print_status(elapsed: float): interrupted = True log.warning("\rCtrl+C pressed, stopping load test") - # Helper to print header + status (used after interruption - # and after waiting for stragglers). - def print_final_status(): - print_table_header() - print_status(time.time() - start_time) - # Print final status line. After Ctrl+C, reprint the header # since the warning message breaks the table flow. After normal # completion, just print a separator + status line. From 221368d74963500ccd93d4104a906b14290821a8 Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Mon, 25 May 2026 01:26:51 +0200 Subject: [PATCH 3/4] Various improvements --- src/qlever/commands/load_test.py | 181 +++++++++++++++++-------------- 1 file changed, 101 insertions(+), 80 deletions(-) diff --git a/src/qlever/commands/load_test.py b/src/qlever/commands/load_test.py index ca553c54..91dea9e0 100644 --- a/src/qlever/commands/load_test.py +++ b/src/qlever/commands/load_test.py @@ -6,11 +6,11 @@ import shlex import statistics import subprocess +import tempfile import threading import time import yaml - from termcolor import colored from qlever.command import QleverCommand @@ -25,8 +25,10 @@ def parse_duration(s: str) -> float: s = s.strip() match = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(s|sec|min|m|h|hr)?", s) if not match: - raise ValueError(f"Cannot parse duration: \"{s}\" (examples: " - f"\"10s\", \"34min\", \"2h\", \"90\")") + raise ValueError( + f'Cannot parse duration: "{s}" (examples: ' + f'"10s", "34min", "2h", "90")' + ) value = float(match.group(1)) unit = match.group(2) or "s" if unit in ("s", "sec"): @@ -36,7 +38,7 @@ def parse_duration(s: str) -> float: elif unit in ("h", "hr"): return value * 3600 else: - raise ValueError(f"Unknown duration unit: \"{unit}\"") + raise ValueError(f'Unknown duration unit: "{unit}"') def read_queries_tsv(path: str) -> list[str]: @@ -48,8 +50,9 @@ def read_queries_tsv(path: str) -> list[str]: if not line or line.startswith("#"): continue parts = line.split("\t", 1) - queries.append(parts[1].strip() if len(parts) == 2 - else parts[0].strip()) + queries.append( + parts[1].strip() if len(parts) == 2 else parts[0].strip() + ) return queries @@ -97,7 +100,10 @@ def __init__(self): pass def description(self) -> str: - return ("Send many concurrent queries to a SPARQL endpoint") + return ( + "Send many concurrent queries (from a TSV, YAML, or JSONL " + "file) to a SPARQL endpoint" + ) def should_have_qleverfile(self) -> bool: return False @@ -106,26 +112,27 @@ def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: return {"server": ["host_name", "port"]} def additional_arguments(self, subparser) -> None: - subparser.add_argument( + queries_group = subparser.add_mutually_exclusive_group(required=True) + queries_group.add_argument( "--queries-tsv", type=str, default=None, help="Path to a TSV file with queries" - " (short_query_namesparql_query)", + " (short_query_namesparql_query)", ) - subparser.add_argument( + queries_group.add_argument( "--queries-yml", type=str, default=None, help="Path to a YAML file with queries" - " (same format as for benchmark-queries)", + " (same format as for benchmark-queries)", ) - subparser.add_argument( + queries_group.add_argument( "--queries-jsonl", type=str, default=None, help="Path to a JSONL file with queries" - " (one JSON object per line with a 'sparql' field)", + " (one JSON object per line with a 'sparql' field)", ) subparser.add_argument( "--query-selection", @@ -146,6 +153,12 @@ def additional_arguments(self, subparser) -> None: default=1000, help="Total number of queries to send (default: 1000)", ) + subparser.add_argument( + "--query-timeout", + type=str, + default="300s", + help="Per-query timeout (default: 300s)", + ) subparser.add_argument( "--log-frequency", type=str, @@ -164,7 +177,7 @@ def additional_arguments(self, subparser) -> None: help="Show query and error message for failed queries", ) subparser.add_argument( - "--width-error-message", + "--error-message-width", type=int, default=150, help="Truncate error messages to this width (default: 150)", @@ -177,43 +190,34 @@ def additional_arguments(self, subparser) -> None: default=None, metavar="N", help="Show the first N queries (default: 10) and exit" - " without running the load test", + " without running the load test (with" + " --query-selection=random, a fresh random sample)", ) def execute(self, args) -> bool: - # Parse log frequency. + # Parse durations. try: log_frequency_secs = parse_duration(args.log_frequency) + query_timeout_secs = parse_duration(args.query_timeout) except ValueError as e: log.error(e) return False - # Exactly one of the three query sources must be specified. - sources = { - "--queries-tsv": args.queries_tsv, - "--queries-yml": args.queries_yml, - "--queries-jsonl": args.queries_jsonl, - } - specified = {k: v for k, v in sources.items() if v is not None} - if len(specified) != 1: - log.error("Please specify exactly one of --queries-tsv," - " --queries-yml, or --queries-jsonl") - return False - - # Read all queries into memory. - source_option, source_path = next(iter(specified.items())) + # Read all queries into memory. The mutually exclusive group with + # `required=True` guarantees that exactly one source is specified. + if args.queries_tsv is not None: + source_path, reader = args.queries_tsv, read_queries_tsv + elif args.queries_yml is not None: + source_path, reader = args.queries_yml, read_queries_yml + else: + source_path, reader = args.queries_jsonl, read_queries_jsonl try: - if source_option == "--queries-tsv": - queries = read_queries_tsv(source_path) - elif source_option == "--queries-yml": - queries = read_queries_yml(source_path) - else: - queries = read_queries_jsonl(source_path) + queries = reader(source_path) except Exception as e: log.error(f"Could not read queries: {e}") return False if not queries: - log.error(f"No queries found in \"{source_path}\"") + log.error(f'No queries found in "{source_path}"') return False # Query selection helper. @@ -228,11 +232,12 @@ def next_query() -> str: cycle_index += 1 return query - # If --show-queries is given, show queries and exit. + # If `--show-queries` is given, show queries and exit. if args.show_queries is not None: n = min(args.show_queries, len(queries)) - log.info(f"Showing {n} of {len(queries)} queries" - f" from \"{source_path}\":") + log.info( + f'Showing {n} of {len(queries)} queries from "{source_path}":' + ) log.info("") for i in range(n): query = next_query() @@ -250,7 +255,7 @@ def next_query() -> str: # Show what the command will do. self.show( - f"Send {args.num_queries} queries from \"{source_path}\"" + f'Send {args.num_queries} queries from "{source_path}"' f" ({len(queries)} queries, {args.query_selection} selection)" f" to {sparql_endpoint} at {args.request_rate} queries/s" f" (log every {log_frequency_secs:.0f}s)", @@ -268,44 +273,55 @@ def next_query() -> str: status_codes: dict[str, int] = {} show_errors = args.show_error_messages - max_error_width = args.width_error_message + max_error_width = args.error_message_width def send_query(query: str): nonlocal num_done, num_errors - result_file = f"/tmp/qlever.load-test.{threading.get_ident()}" - curl_cmd = ( - f"curl -s {sparql_endpoint}" - f" -H \"Accept: application/qlever-results+json\"" - f" --data-urlencode query={shlex.quote(query)}" - f" -o {result_file} -w \"%{{http_code}}\"" - ) + query_oneline = re.sub(r"\s+", " ", query) start = time.time() try: - result = subprocess.run( - curl_cmd, shell=True, capture_output=True, - text=True, timeout=300, - ) - elapsed = time.time() - start - http_code = result.stdout.strip() + with tempfile.NamedTemporaryFile( + mode="r", prefix="qlever.load-test.", delete=True + ) as result_file: + curl_cmd = ( + f"curl -s {sparql_endpoint}" + f' -H "Accept: application/qlever-results+json"' + f" --data-urlencode query={shlex.quote(query)}" + f" --max-time {query_timeout_secs:g}" + f" -o {result_file.name}" + f' -w "%{{http_code}}"' + ) + result = subprocess.run( + curl_cmd, + shell=True, + capture_output=True, + text=True, + timeout=query_timeout_secs + 10, + ) + elapsed = time.time() - start + http_code = result.stdout.strip() or "000" + body = ( + result_file.read() + if show_errors and http_code != "200" + else "" + ) with lock: num_done += 1 completed_times.append(elapsed) - status_codes[http_code] = \ + status_codes[http_code] = ( status_codes.get(http_code, 0) + 1 + ) if http_code != "200": num_errors += 1 if show_errors: - try: - with open(result_file) as f: - body = f.read().strip() - msg = re.sub(r"\s+", " ", body) - if len(msg) > max_error_width: - msg = msg[:max_error_width] + "..." - except Exception: - msg = "(could not read error response)" + msg = ( + re.sub(r"\s+", " ", body).strip() + or "(empty response)" + ) + if len(msg) > max_error_width: + msg = msg[:max_error_width] + "..." log.info("") - log.info(colored(re.sub(r"\s+", " ", query), - "magenta")) + log.info(colored(query_oneline, "magenta")) log.error(f"HTTP {http_code}: {msg}") except Exception: elapsed = time.time() - start @@ -313,12 +329,12 @@ def send_query(query: str): num_done += 1 num_errors += 1 completed_times.append(elapsed) - status_codes["timeout"] = \ + status_codes["timeout"] = ( status_codes.get("timeout", 0) + 1 + ) if show_errors: log.info("") - log.info(colored(re.sub(r"\s+", " ", query), - "magenta")) + log.info(colored(query_oneline, "magenta")) log.error("Request timed out or failed") # Print status header and status lines. @@ -356,10 +372,7 @@ def print_status(elapsed: float): f" {p95:>7.2f}s {max_t:>7.2f}s" ) else: - time_stats = ( - f" {'':>8s} {'':>8s}" - f" {'':>8s} {'':>8s}" - ) + time_stats = f" {'':>8s} {'':>8s} {'':>8s} {'':>8s}" log.info( f"{elapsed:>7.0f}s" f" {n_launched:>8d} {n_done:>8d}" @@ -382,16 +395,22 @@ def print_final_status(): now = time.time() elapsed = now - start_time - # Launch queries that are due. - while next_launch_time <= now \ - and num_launched < args.num_queries: + # Launch queries that are due. Bump `num_launched` BEFORE + # starting the thread so that a fast-completing query cannot + # make `num_done > num_launched` and yield a negative + # `Running` count in the status line. + while ( + next_launch_time <= now and num_launched < args.num_queries + ): query = next_query() thread = threading.Thread( - target=send_query, args=(query,), daemon=True, + target=send_query, + args=(query,), + daemon=True, ) - thread.start() with lock: num_launched += 1 + thread.start() next_launch_time += interval # Print status line if due. @@ -421,9 +440,11 @@ def print_final_status(): # Show HTTP status code histogram. http_status_names = { - "200": "OK", "400": "Bad Request", + "200": "OK", + "400": "Bad Request", "429": "Too Many Requests", - "500": "Internal Server Error", "502": "Bad Gateway", + "500": "Internal Server Error", + "502": "Bad Gateway", } with lock: codes = dict(status_codes) From 8603b4760bc93edf3781a7ee3c8e335227fde48a Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Wed, 27 May 2026 14:16:42 +0200 Subject: [PATCH 4/4] Fix bugs and reduce duplication in `load-test` command So far, the command had several rough edges: a race in the launched/done counter that could briefly show a negative `Running` count under fast-completing queries, a hardcoded 300s timeout that was neither configurable nor passed to curl as `--max-time`, result files accumulating in `/tmp`, a manual "exactly one of" check for the `--queries-*` flags, and the success-path and exception-path outcome handling in `send_query` were duplicated. Now, `num_launched` is incremented under the lock before the worker thread starts; `--query-timeout` exposes the per-query timeout (default `300s`, passed both to curl as `--max-time` and to `subprocess.run`); result files use `tempfile.NamedTemporaryFile(delete=True)`; the `--queries-*` flags use `add_mutually_exclusive_group(required=True)`; and `send_query` records the outcome in a single block, distinguishing `TimeoutExpired` (bucket `timeout`) from other exceptions (bucket `error`). While at it, validate `--request-rate > 0` and `--query-timeout > 0`, rename `--width-error-message` to `--error-message-width`, and clarify the `--show-queries` help text. --- src/qlever/commands/load_test.py | 87 ++++++++++++++++---------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/src/qlever/commands/load_test.py b/src/qlever/commands/load_test.py index 91dea9e0..073a3f4c 100644 --- a/src/qlever/commands/load_test.py +++ b/src/qlever/commands/load_test.py @@ -189,7 +189,7 @@ def additional_arguments(self, subparser) -> None: const=10, default=None, metavar="N", - help="Show the first N queries (default: 10) and exit" + help="Show up to N queries (N=10 if not specified) and exit" " without running the load test (with" " --query-selection=random, a fresh random sample)", ) @@ -202,9 +202,18 @@ def execute(self, args) -> bool: except ValueError as e: log.error(e) return False + if args.request_rate <= 0: + log.error( + f"--request-rate must be positive (got {args.request_rate})" + ) + return False + if query_timeout_secs <= 0: + log.error( + f"--query-timeout must be positive (got {args.query_timeout})" + ) + return False - # Read all queries into memory. The mutually exclusive group with - # `required=True` guarantees that exactly one source is specified. + # Read all queries into memory. if args.queries_tsv is not None: source_path, reader = args.queries_tsv, read_queries_tsv elif args.queries_yml is not None: @@ -277,7 +286,7 @@ def next_query() -> str: def send_query(query: str): nonlocal num_done, num_errors - query_oneline = re.sub(r"\s+", " ", query) + error_message: str | None = None start = time.time() try: with tempfile.NamedTemporaryFile( @@ -299,43 +308,37 @@ def send_query(query: str): timeout=query_timeout_secs + 10, ) elapsed = time.time() - start - http_code = result.stdout.strip() or "000" - body = ( - result_file.read() - if show_errors and http_code != "200" - else "" - ) - with lock: - num_done += 1 - completed_times.append(elapsed) - status_codes[http_code] = ( - status_codes.get(http_code, 0) + 1 - ) - if http_code != "200": - num_errors += 1 - if show_errors: - msg = ( - re.sub(r"\s+", " ", body).strip() - or "(empty response)" - ) - if len(msg) > max_error_width: - msg = msg[:max_error_width] + "..." - log.info("") - log.info(colored(query_oneline, "magenta")) - log.error(f"HTTP {http_code}: {msg}") - except Exception: + bucket = result.stdout.strip() or "000" + if bucket != "200" and show_errors: + msg = ( + re.sub(r"\s+", " ", result_file.read()).strip() + or "(empty response)" + ) + if len(msg) > max_error_width: + msg = msg[:max_error_width] + "..." + error_message = f"HTTP {bucket}: {msg}" + except Exception as exc: elapsed = time.time() - start - with lock: - num_done += 1 - num_errors += 1 - completed_times.append(elapsed) - status_codes["timeout"] = ( - status_codes.get("timeout", 0) + 1 - ) + if isinstance(exc, subprocess.TimeoutExpired): + bucket = "timeout" + if show_errors: + error_message = "Request timed out" + else: + bucket = "error" if show_errors: - log.info("") - log.info(colored(query_oneline, "magenta")) - log.error("Request timed out or failed") + error_message = f"Request failed: {exc}" + + # Record outcome. + with lock: + num_done += 1 + completed_times.append(elapsed) + status_codes[bucket] = status_codes.get(bucket, 0) + 1 + if bucket != "200": + num_errors += 1 + if error_message is not None: + log.info("") + log.info(colored(re.sub(r"\s+", " ", query), "magenta")) + log.error(error_message) # Print status header and status lines. header = ( @@ -395,10 +398,8 @@ def print_final_status(): now = time.time() elapsed = now - start_time - # Launch queries that are due. Bump `num_launched` BEFORE - # starting the thread so that a fast-completing query cannot - # make `num_done > num_launched` and yield a negative - # `Running` count in the status line. + # Launch queries that are due. Increment `num_launched` + # before starting the thread, so `num_done` cannot exceed it. while ( next_launch_time <= now and num_launched < args.num_queries ):