Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"tqdm>=4.65.0",
"litellm>=1.83.0",
"tokenizers>=0.20.0",
"tiktoken",
]

[project.scripts]
Expand Down Expand Up @@ -89,6 +90,10 @@ ignore_missing_imports = true
module = "litellm.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "tiktoken.*"
ignore_missing_imports = true

[tool.ruff]
line-length = 100
target-version = "py310"
Expand Down
86 changes: 66 additions & 20 deletions src/cordon/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,30 @@ def parse_args() -> argparse.Namespace:
default=None,
help="Batch size for k-NN scoring queries (default: auto-detect based on GPU memory)",
)
config_group.add_argument(
"--token-budget",
type=int,
default=None,
help="Maximum token budget for output; dynamically adjusts percentile to fit (overrides --anomaly-percentile)",
)
config_group.add_argument(
"--tokenizer-encoding",
type=str,
default="cl100k_base",
help="tiktoken encoding for token counting (default: cl100k_base)",
)
config_group.add_argument(
"--max-blocks",
type=int,
default=None,
help="Maximum number of anomaly blocks to output (keeps highest scoring)",
)
config_group.add_argument(
"--min-score",
type=float,
default=None,
help="Minimum anomaly score threshold for output blocks",
)

# output options
output_group = parser.add_argument_group("output options")
Expand Down Expand Up @@ -160,9 +184,8 @@ def parse_args() -> argparse.Namespace:
"--quiet",
"-q",
action="store_true",
help="Suppress progress bars (useful for CI or library usage)",
help="Suppress all human-readable banners and progress bars, keeping only formatted output on stdout",
)

return parser.parse_args()


Expand Down Expand Up @@ -212,6 +235,7 @@ def _display_results(
detailed: bool,
output_path: Path | None,
force: bool,
quiet: bool,
) -> None:
"""Display analysis results, optionally with detailed statistics.

Expand All @@ -220,8 +244,9 @@ def _display_results(
detailed: Whether to print detailed statistics before the output.
output_path: Optional path to save anomalous blocks (None prints to stdout).
force: If True, overwrite an existing output file.
quiet: If True, suppress human-readable banners and stats.
"""
if detailed:
if detailed and not quiet:
print(f"Total lines: {result.total_lines:,}")
print("\nAnalysis Statistics:")
print(f" Total windows created: {result.total_windows:,}")
Expand All @@ -243,7 +268,8 @@ def _display_results(
else:
print(result.output)

print()
if not quiet:
print()


def analyze_file(
Expand All @@ -252,6 +278,7 @@ def analyze_file(
detailed: bool,
output_path: Path | None = None,
force: bool = False,
quiet: bool = False,
) -> None:
"""Analyze a single log file and print results.

Expand All @@ -261,28 +288,31 @@ def analyze_file(
detailed: Whether to show detailed statistics.
output_path: Optional path to save anomalous blocks (None prints to stdout).
force: If True, overwrite an existing output file.
quiet: If True, suppress human-readable banners and stats.
"""
if not _validate_file(log_path):
return

print("=" * 80)
print(f"Analyzing: {log_path}")
print("=" * 80)
if not quiet:
print("=" * 80)
print(f"Analyzing: {log_path}")
print("=" * 80)

try:
result = analyzer.analyze_file_detailed(log_path)
except Exception as error:
print(f"Error analyzing {log_path}: {error}", file=sys.stderr)
return

_display_results(result, detailed, output_path, force)
_display_results(result, detailed, output_path, force, quiet)


def analyze_stdin(
analyzer: SemanticLogAnalyzer,
detailed: bool,
output_path: Path | None = None,
force: bool = False,
quiet: bool = False,
) -> None:
"""Analyze log data from stdin and print results.

Expand All @@ -291,20 +321,22 @@ def analyze_stdin(
detailed: Whether to show detailed statistics.
output_path: Optional path to save anomalous blocks (None prints to stdout).
force: If True, overwrite an existing output file.
quiet: If True, suppress human-readable banners and stats.
"""
text = sys.stdin.read()

print("=" * 80)
print("Analyzing: <stdin>")
print("=" * 80)
if not quiet:
print("=" * 80)
print("Analyzing: <stdin>")
print("=" * 80)

try:
result = analyzer.analyze_text_detailed(text)
except Exception as error:
print(f"Error analyzing <stdin>: {error}", file=sys.stderr)
return

_display_results(result, detailed, output_path, force)
_display_results(result, detailed, output_path, force, quiet)


def _print_backend_info(config: AnalysisConfig) -> None:
Expand Down Expand Up @@ -355,6 +387,12 @@ def _main_impl() -> None:
file=sys.stderr,
)

if args.token_budget is not None and not isclose(args.anomaly_percentile, 0.1):
print(
"Warning: --anomaly-percentile is overridden by --token-budget",
file=sys.stderr,
)

# create configuration from arguments
try:
config = AnalysisConfig(
Expand All @@ -376,17 +414,21 @@ def _main_impl() -> None:
api_key=args.api_key,
endpoint=args.endpoint,
show_progress=not args.quiet,
token_budget=args.token_budget,
tokenizer_encoding=args.tokenizer_encoding,
output_format=args.output_format,
max_blocks=args.max_blocks,
min_score=args.min_score,
)
except ValueError as error:
print(f"Configuration error: {error}", file=sys.stderr)
sys.exit(1)

# create analyzer
print("Initializing analyzer...")
_print_backend_info(config)
_print_filtering_mode(config)
print()
if not args.quiet:
print("Initializing analyzer...")
_print_backend_info(config)
_print_filtering_mode(config)
print()

try:
analyzer = SemanticLogAnalyzer(config)
Expand All @@ -399,14 +441,18 @@ def _main_impl() -> None:
except Exception as error:
print(f"Initialization error: {error}", file=sys.stderr)
sys.exit(1)
print()

if not args.quiet:
print()

# analyze each log file
for log_path in args.logfiles:
if str(log_path) == "-":
analyze_stdin(analyzer, args.detailed, args.output, args.force)
analyze_stdin(analyzer, args.detailed, args.output, args.force, quiet=args.quiet)
else:
analyze_file(log_path, analyzer, args.detailed, args.output, args.force)
analyze_file(
log_path, analyzer, args.detailed, args.output, args.force, quiet=args.quiet
)


def main() -> None:
Expand Down
19 changes: 19 additions & 0 deletions src/cordon/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,16 @@ class AnalysisConfig:
request_timeout: HTTP request timeout in seconds (remote backend).
show_progress: Whether to display tqdm progress bars during
embedding and scoring. Set to False for CI or library use.
token_budget: Maximum token count for output. When set, dynamically
computes anomaly_percentile from the input size to fit
the output within this budget. Overrides anomaly_percentile.
tokenizer_encoding: tiktoken encoding name for token counting
(default: cl100k_base, used by GPT-4/GPT-3.5).
output_format: Output format for anomaly blocks.
max_blocks: Maximum number of anomaly blocks to include in output.
Keeps the highest-scoring blocks. None disables the limit.
min_score: Minimum anomaly score threshold. Blocks below this
score are excluded from output. None disables the threshold.
"""

window_size: int = 4
Expand All @@ -91,7 +100,11 @@ class AnalysisConfig:
endpoint: str | None = None
request_timeout: float = 60.0
show_progress: bool = True
token_budget: int | None = None
tokenizer_encoding: str = "cl100k_base"
output_format: Literal["xml", "json"] = "xml"
max_blocks: int | None = None
min_score: float | None = None

def __post_init__(self) -> None:
"""Validate configuration parameters."""
Expand Down Expand Up @@ -122,6 +135,12 @@ def _validate_core_params(self) -> None:
raise ValueError(
f"output_format must be one of {_ALLOWED_FORMATS}, got {self.output_format!r}"
)
if self.token_budget is not None and self.token_budget < 1:
raise ValueError("token_budget must be >= 1 if set")
if self.max_blocks is not None and self.max_blocks < 1:
raise ValueError("max_blocks must be >= 1 if set")
if self.min_score is not None and self.min_score < 0:
raise ValueError("min_score must be >= 0 if set")

def _validate_anomaly_range(self) -> None:
"""Validate anomaly range parameters."""
Expand Down
40 changes: 38 additions & 2 deletions src/cordon/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import time
from collections.abc import Sequence
from dataclasses import replace
from pathlib import Path

import numpy as np
Expand All @@ -23,6 +25,8 @@
from cordon.postprocess.merger import IntervalMerger
from cordon.segmentation.windower import SlidingWindowSegmenter

logger = logging.getLogger(__name__)


class SemanticLogAnalyzer:
"""High-level API for semantic log analysis.
Expand Down Expand Up @@ -153,14 +157,46 @@ def _analyze_lines(self, lines_list: list[tuple[int, str]]) -> AnalysisResult:
del embedded

# stage 5: thresholding
significant = self._thresholder.select_significant(scored, self.config)
thresholding_config = self.config
if self.config.token_budget is not None:
import tiktoken

enc = tiktoken.get_encoding(self.config.tokenizer_encoding)
total_tokens = sum(len(enc.encode(text)) for _, text in lines_list)

if total_tokens > 0:
budget_percentile = min(self.config.token_budget / total_tokens, 1.0)
else:
budget_percentile = 1.0

thresholding_config = replace(self.config, anomaly_percentile=budget_percentile)

logger.info(
"Token budget: %d/%d tokens (%.1f%% percentile)",
self.config.token_budget,
total_tokens,
budget_percentile * 100,
)

significant = self._thresholder.select_significant(scored, thresholding_config)
significant_windows = len(significant)

# stage 6: merging
merged = self._merger.merge_windows(significant)
merged_blocks_count = len(merged)
del significant

# stage 6b: post-merge filters
if self.config.min_score is not None:
merged = [b for b in merged if b.max_score >= self.config.min_score]

if self.config.max_blocks is not None and len(merged) > self.config.max_blocks:
merged = sorted(merged, key=lambda b: b.max_score, reverse=True)[
: self.config.max_blocks
]
merged = sorted(merged, key=lambda b: b.start_line)

merged_blocks_count = len(merged)

# stage 7: formatting
output = self._formatter.format_blocks(merged, lines_list)

Expand Down
Loading
Loading