Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 1 addition & 30 deletions src/qlever/commands/benchmark_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,8 @@
from qlever.command import QleverCommand
from qlever.commands.clear_cache import ClearCacheCommand
from qlever.commands.ui import dict_to_yaml
from qlever.containerize import Containerize
from qlever.log import log, mute_log
from qlever.util import run_command, run_curl_command


def pretty_printed_query(
query: str, show_prefixes: bool, system: str = "docker"
) -> str:
"""
Pretty-print a SPARQL query using the sparql-formatter Docker image.
Optionally strips PREFIX declarations from the output.
Argument `system` can either be docker or podman.
"""
if system not in Containerize.supported_systems():
system = "docker"
remove_prefixes_cmd = " | sed '/^PREFIX /Id'" if not show_prefixes else ""
pretty_print_query_cmd = (
f"echo {shlex.quote(query)}"
f" | {system} run -i --rm docker.io/sparqling/sparql-formatter"
f"{remove_prefixes_cmd} | grep -v '^$'"
)
try:
query_pretty_printed = run_command(
pretty_print_query_cmd, return_output=True
)
return query_pretty_printed.rstrip()
except Exception as e:
log.debug(
f"Failed to pretty-print query, returning original query: {e}"
)
return query.rstrip()
from qlever.util import pretty_printed_query, run_command, run_curl_command


def sparql_query_type(query: str) -> str:
Expand Down
160 changes: 160 additions & 0 deletions src/qlever/commands/monitor_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from __future__ import annotations

import json
import re
import subprocess
import textwrap
import time

from qlever.command import QleverCommand
from qlever.log import log
from qlever.util import pretty_printed_query


def render_queries(monitor_queries_cmd, args) -> bool:
try:
monitored_queries = subprocess.check_output(
monitor_queries_cmd, shell=True
)
monitored_queries_dict = json.loads(monitored_queries)
except Exception as e:
log.error(f"Failed to get active queries: {e}")
return False

if not monitored_queries_dict:
log.info("No active queries on the server")
return True

queries = list(monitored_queries_dict.items())

# Show the full SPARQL for a specific query.
if args.query_id:
# Try as a table index first, then as a server query ID.
try:
idx = int(args.query_id)
if 1 <= idx <= len(queries):
sparql_query = queries[idx - 1][1]
else:
sparql_query = None
except ValueError:
sparql_query = monitored_queries_dict.get(args.query_id)
if not sparql_query:
log.error("No active query found for the given ID")
return False
log.info(pretty_printed_query(sparql_query, False, args.system))
return True

# Table header.
col_index = 3
col_qid = max(len(qid) for qid, _ in queries)
indent = " " * (2 + col_index + 2 + col_qid + 2)
log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL")

for i, (qid, sparql) in enumerate(queries, 1):
# Collapse whitespace for compact display.
sparql_oneline = re.sub(r"\s+", " ", sparql).strip()
if args.detailed:
wrapped = textwrap.fill(
sparql_oneline,
width=100,
initial_indent="",
subsequent_indent=indent,
)
log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}")
else:
short_sparql = (
sparql_oneline[:80] + "..."
if len(sparql_oneline) > 80
else sparql_oneline
)
log.info(
f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}"
)

return True


class MonitorQueriesCommand(QleverCommand):
"""
Class for executing the `monitor-queries` command.
"""

def __init__(self):
pass

def description(self) -> str:
return "Show the currently active queries on the server"

def should_have_qleverfile(self) -> bool:
return False

def relevant_qleverfile_arguments(self) -> dict[str, list[str]]:
return {
"server": ["access_token", "host_name", "port"],
"runtime": ["system"],
}

def additional_arguments(self, subparser) -> None:
subparser.add_argument(
"--sparql-endpoint",
help="URL of the SPARQL endpoint, default is {host_name}:{port}",
)
subparser.add_argument(
"--detailed",
action="store_true",
default=False,
help="Show the full SPARQL text for each active query",
)
subparser.add_argument(
"--query-id",
help="Show the full SPARQL text for a specific query,"
" either by its index (#) or server query ID",
)
subparser.add_argument(
"--watch",
action="store_true",
default=False,
help="Continuously refresh the list of active queries"
" until interrupted with Ctrl-C",
)
subparser.add_argument(
"--interval",
type=float,
default=2.0,
help="Refresh interval in seconds when using --watch"
" (default: 2.0)",
)

def execute(self, args) -> bool:
sparql_endpoint = (
args.sparql_endpoint
if args.sparql_endpoint
else f"{args.host_name}:{args.port}"
)
monitor_queries_cmd = (
f'curl -s {sparql_endpoint} --data-urlencode "cmd=dump-active-queries" '
f'--data-urlencode access-token="{args.access_token}"'
)

# Show them.
self.show(monitor_queries_cmd, only_show=args.show)
if args.show:
return True

if args.watch and args.interval < 0.5:
log.error("--interval must be at least 0.5 seconds")
return False
if args.watch and args.query_id:
log.error("--watch cannot be combined with --query-id")
return False

if args.watch:
try:
while True:
print("\033[H\033[2J", end="", flush=True)
render_queries(monitor_queries_cmd, args)
time.sleep(args.interval)
except KeyboardInterrupt:
return True

return render_queries(monitor_queries_cmd, args)
30 changes: 30 additions & 0 deletions src/qlever/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,36 @@ def format_size(bytes, suffix="B"):
bytes /= factor


def pretty_printed_query(
query: str, show_prefixes: bool, system: str = "docker"
) -> str:
"""
Pretty-print a SPARQL query using the sparql-formatter Docker image.
Optionally strips PREFIX declarations from the output.
Argument `system` can either be docker or podman.
"""
from qlever.containerize import Containerize

if system not in Containerize.supported_systems():
system = "docker"
remove_prefixes_cmd = " | sed '/^PREFIX /Id'" if not show_prefixes else ""
pretty_print_query_cmd = (
f"echo {shlex.quote(query)}"
f" | {system} run -i --rm docker.io/sparqling/sparql-formatter"
f"{remove_prefixes_cmd} | grep -v '^$'"
)
try:
query_pretty_printed = run_command(
pretty_print_query_cmd, return_output=True
)
return query_pretty_printed.rstrip()
except Exception as e:
log.debug(
f"Failed to pretty-print query, returning original query: {e}"
)
return query.rstrip()


def stop_process(proc: psutil.Process, pinfo: dict[str, Any]) -> bool:
"""
Try to kill the given process, return True iff it was killed
Expand Down
Loading