From 123413dfcc3bbd51c2dce02c1391dec74774effa Mon Sep 17 00:00:00 2001 From: Florian Roth Date: Tue, 14 Apr 2026 23:33:49 +0200 Subject: [PATCH] fix: clarify CLI rule feed selection --- README.md | 23 +++-- tests/test_cli_flags.py | 181 ++++++++++++++++++++++++++++++++++++ valhallaAPI/valhalla.py | 10 ++ valhallaAPI/valhalla_cli.py | 102 ++++++++++++++++---- 4 files changed, 290 insertions(+), 26 deletions(-) create mode 100644 tests/test_cli_flags.py diff --git a/README.md b/README.md index 71e6fc4..6434d7f 100644 --- a/README.md +++ b/README.md @@ -459,8 +459,14 @@ or just download the precompiled `valhalla-cli.exe` from the latest release in t ## Usage ``` -usage: valhalla-cli [-h] [-k apikey] [-c config-file] [-o output-file] [--check] [--debug] [-p proxy-url] [-pu proxy-user] [-pp proxy-pass] [-fp product] [-fv yara-version] [-fm modules [modules ...]] - [-ft tags [tags ...]] [-fs score] [-fq query] [--nocrypto] [-lr lookup-rule] [-lh lookup-hash] [-lk lookup-keyword] [-lkm lookup-keyword] [-lo lookup-output] +usage: valhalla-cli [-h] [-k apikey] [-c config-file] [-o output-file] + [--check] [--debug] [--feed {yara,sigma}] [-s] + [-p proxy-url] [-pu proxy-user] [-pp proxy-pass] + [-fp product] [-fpo] [-fv yara-version] + [-fm modules [modules ...]] [-ft tags [tags ...]] + [-fs score] [-fq query] [--nocrypto] [-lr lookup-rule] + [-lh lookup-hash] [-lk lookup-keyword] + [-lkm lookup-keyword] [-lo lookup-output] Valhalla-CLI @@ -469,9 +475,10 @@ optional arguments: -k apikey API KEY -c config-file Config file (see README for details) -o output-file output file - -s Load sigma rules - --check Check subscription info and total rule count + --check Check account status and subscription details --debug Debug output + --feed {yara,sigma} select rule feed: yara (default) or sigma + -s, --sigma retrieve Sigma rules (shortcut for --feed sigma) ======================================================================= Proxy: @@ -511,6 +518,8 @@ Check the status of your subscription valhalla-cli -k YOUR-API-KEY --check ``` +The default rule feed is YARA. If you want to retrieve Sigma rules, use `--feed sigma` or `--sigma` (the legacy `-s` flag still works). + Get all subscribed rules and save them to `valhalla-rules.yar` ```bash valhalla-cli -k YOUR-API-KEY @@ -518,7 +527,7 @@ valhalla-cli -k YOUR-API-KEY Get all sigma rules and save them to `valhalla-rules.zip` ```bash -valhalla-cli -k YOUR-API-KEY -s +valhalla-cli -k YOUR-API-KEY --sigma ``` Get rules with score higher than 75 and save them to `valhalla-rules.yar` @@ -674,10 +683,10 @@ It will return a JSON structure. } ```` -To query sigma rules, add `-s`: +To query Sigma rules, add `--sigma` (or keep using `-s`): ```bash -./valhalla-cli -s -lr 06d71506-7beb-4f22-8888-e2e5e2ca7fd8 +./valhalla-cli --sigma -lr 06d71506-7beb-4f22-8888-e2e5e2ca7fd8 ``` # Scores diff --git a/tests/test_cli_flags.py b/tests/test_cli_flags.py new file mode 100644 index 0000000..c6e1fed --- /dev/null +++ b/tests/test_cli_flags.py @@ -0,0 +1,181 @@ +import io +import json +import sys +import zipfile + +import pytest + +import valhallaAPI.valhalla as valhalla_module +import valhallaAPI.valhalla_cli as valhalla_cli +from valhallaAPI.filters import ApiError +from valhallaAPI.valhalla import ValhallaAPI + + +class MockResponse(object): + def __init__(self, payload): + self.text = json.dumps(payload) + + +def test_cli_check_mentions_sigma_feed_flag(monkeypatch, capsys): + monkeypatch.setattr(sys, "argv", ["valhalla-cli", "--check"]) + monkeypatch.setattr(valhalla_cli.os.path, "exists", lambda path: False) + monkeypatch.setattr( + valhalla_cli.ValhallaAPI, + "get_subscription", + lambda self: {"active": True}, + ) + + with pytest.raises(SystemExit) as exc: + valhalla_cli.main() + + captured = capsys.readouterr() + + assert exc.value.code == 0 + assert "Account is active" in captured.err + assert "YARA is the default rule feed" in captured.err + assert "--feed sigma or --sigma/-s" in captured.err + + +def test_cli_feed_sigma_retrieves_sigma_rules(monkeypatch, capsys, tmp_path): + output_file = tmp_path / "sigma-rules.zip" + + monkeypatch.setattr(sys, "argv", [ + "valhalla-cli", + "--feed", + "sigma", + "-o", + str(output_file), + ]) + monkeypatch.setattr(valhalla_cli.os.path, "exists", lambda path: False) + + def fake_get_sigma_rules_zip(self, search="", private_only=False): + self.last_retrieved_rules_count = 2 + return b"zip-bytes" + + def unexpected_yara_fetch(self, **kwargs): + raise AssertionError("YARA retrieval should not be used for --feed sigma") + + monkeypatch.setattr( + valhalla_cli.ValhallaAPI, + "get_sigma_rules_zip", + fake_get_sigma_rules_zip, + ) + monkeypatch.setattr( + valhalla_cli.ValhallaAPI, + "get_rules_text", + unexpected_yara_fetch, + ) + + valhalla_cli.main() + + captured = capsys.readouterr() + + assert output_file.read_bytes() == b"zip-bytes" + assert "Selected rule feed: SIGMA" in captured.err + assert "Retrieving Sigma rules with params" in captured.err + + +def test_cli_yara_feed_access_error_suggests_sigma(monkeypatch, capsys): + monkeypatch.setattr(sys, "argv", ["valhalla-cli"]) + monkeypatch.setattr(valhalla_cli.os.path, "exists", lambda path: False) + + def fake_get_rules_text(self, **kwargs): + raise ApiError("user has no rule feed access") + + monkeypatch.setattr(valhalla_cli.ValhallaAPI, "get_rules_text", fake_get_rules_text) + + with pytest.raises(SystemExit) as exc: + valhalla_cli.main() + + captured = capsys.readouterr() + + assert exc.value.code == 1 + assert "user has no rule feed access" in captured.err + assert "This request targets the YARA feed" in captured.err + assert "--feed sigma or --sigma/-s" in captured.err + + +def test_cli_sigma_warns_about_ignored_yara_flags(monkeypatch, capsys, tmp_path): + output_file = tmp_path / "sigma-rules.zip" + + monkeypatch.setattr(sys, "argv", [ + "valhalla-cli", + "--sigma", + "-fp", + "CarbonBlack", + "-fs", + "75", + "-o", + str(output_file), + ]) + monkeypatch.setattr(valhalla_cli.os.path, "exists", lambda path: False) + monkeypatch.setattr( + valhalla_cli.ValhallaAPI, + "get_sigma_rules_zip", + lambda self, search="", private_only=False: b"zip-bytes", + ) + + valhalla_cli.main() + + captured = capsys.readouterr() + + assert output_file.read_bytes() == b"zip-bytes" + assert "Ignoring YARA-only flags for Sigma retrieval: -fp, -fs" in captured.err + + +def test_sigma_zip_updates_retrieved_rule_count(monkeypatch): + def fake_post(url, data=None, proxies=None, headers=None): + assert url.endswith("/getsigma") + return MockResponse( + { + "rules": [ + { + "signature_type": "sigma", + "type": "Process Creation", + "filename": "first.yml", + "content": "title: first", + }, + { + "signature_type": "sigma", + "type": "Network Connection", + "filename": "second.yml", + "content": "title: second", + }, + ] + } + ) + + monkeypatch.setattr(valhalla_module.requests, "post", fake_post) + + v = ValhallaAPI(api_key=ValhallaAPI.DEMO_KEY) + archive = v.get_sigma_rules_zip() + + assert v.last_retrieved_rules_count == 2 + + with zipfile.ZipFile(io.BytesIO(archive), "r") as zip_file: + names = sorted(zip_file.namelist()) + + assert names == [ + "sigma/NetworkConnection/second.yml", + "sigma/ProcessCreation/first.yml", + ] + + +def test_sigma_zip_raises_api_error(monkeypatch): + def fake_post(url, data=None, proxies=None, headers=None): + assert url.endswith("/getsigma") + return MockResponse( + { + "status": "error", + "message": "user has no sigma rule feed access", + } + ) + + monkeypatch.setattr(valhalla_module.requests, "post", fake_post) + + v = ValhallaAPI(api_key="invalid") + + with pytest.raises(ApiError) as exc: + v.get_sigma_rules_zip() + + assert exc.value.message == "user has no sigma rule feed access" diff --git a/valhallaAPI/valhalla.py b/valhallaAPI/valhalla.py index 7d6cb0e..f2ce321 100644 --- a/valhallaAPI/valhalla.py +++ b/valhallaAPI/valhalla.py @@ -317,6 +317,12 @@ def get_sigma_rules_json(self, search="", private_only=False): rules_response['rules'] = filter_search(rules_response['rules'], query=search) if private_only: rules_response['rules'] = filter_privateonly(rules_response['rules']) + + if 'rules' in rules_response: + self.last_retrieved_rules_count = len(rules_response['rules']) + else: + self.last_retrieved_rules_count = 0 + # Return filtered set return rules_response @@ -326,6 +332,10 @@ def get_sigma_rules_zip(self, search="", private_only=False): :return: """ rules_response = self.get_sigma_rules_json(search, private_only) + + if 'status' in rules_response: + if rules_response['status'] == "error": + raise ApiError(rules_response['message']) zip_buffer = io.BytesIO() with zipfile.ZipFile(file=zip_buffer, mode='w') as zip_file: diff --git a/valhallaAPI/valhalla_cli.py b/valhallaAPI/valhalla_cli.py index da01218..271ee0c 100755 --- a/valhallaAPI/valhalla_cli.py +++ b/valhallaAPI/valhalla_cli.py @@ -15,6 +15,54 @@ from packaging import version from valhallaAPI.valhalla import ValhallaAPI, UnknownProductError, ApiError +YARA_FEED = "yara" +SIGMA_FEED = "sigma" + + +def resolve_rule_feed(args, logger): + if args.sigma and args.feed == YARA_FEED: + logger.warning("Ignoring --feed yara because --sigma/-s was also set") + if args.sigma: + return SIGMA_FEED + if args.feed: + return args.feed + return YARA_FEED + + +def get_feed_guidance(rule_feed): + if rule_feed == SIGMA_FEED: + return "Current rule feed selection: Sigma (--feed sigma or --sigma/-s)." + return "YARA is the default rule feed; use --feed sigma or --sigma/-s to retrieve Sigma rules." + + +def get_feed_error_message(message, rule_feed): + if "no rule feed access" not in message.lower(): + return message + + if rule_feed == SIGMA_FEED: + hint = " This request targets the Sigma feed. If your account only has YARA access, rerun without --sigma/-s or use --feed yara." + else: + hint = " This request targets the YARA feed. If your account has Sigma access, rerun with --feed sigma or --sigma/-s." + + return "%s%s" % (message, hint) + + +def get_ignored_sigma_flags(args): + ignored = [] + if args.fp: + ignored.append("-fp") + if args.fv: + ignored.append("-fv") + if args.fm: + ignored.append("-fm") + if args.ft: + ignored.append("-ft") + if str(args.fs) != "0": + ignored.append("-fs") + if args.nocrypto is False: + ignored.append("--nocrypto") + return ignored + def main(): """ @@ -28,9 +76,12 @@ def main(): default=os.path.join(str(Path.home()), ".valhalla")) parser.add_argument('-o', help='output file', metavar='output-file', default=ValhallaAPI.DEFAULT_OUTPUT_FILE) parser.add_argument('--check', action='store_true', default=False, - help='Check subscription info and total rule count') + help='Check account status and subscription details') parser.add_argument('--debug', action='store_true', default=False, help='Debug output') - parser.add_argument('-s', action='store_true', default=False, help='Load Sigma rules') + parser.add_argument('--feed', choices=[YARA_FEED, SIGMA_FEED], default=None, + help='select rule feed: yara (default) or sigma') + parser.add_argument('-s', '--sigma', dest='sigma', action='store_true', default=False, + help='retrieve Sigma rules (shortcut for --feed sigma)') group_proxy = parser.add_argument_group( '=======================================================================\nProxy') @@ -90,6 +141,7 @@ def main(): logFormatterRemote = logging.Formatter("{0} [%(levelname)-5.5s] %(message)s".format(platform.uname()[1])) Log = logging.getLogger(__name__) Log.setLevel(logging.INFO) + Log.handlers.clear() # Console Handler consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) @@ -127,6 +179,7 @@ def main(): # Create the ValhallaAPI object v = ValhallaAPI(api_key=apikey) + rule_feed = resolve_rule_feed(args, Log) # Subscription check if args.check: @@ -134,6 +187,7 @@ def main(): if 'active' in status: if status['active']: Log.info("Account is active: %s" % status) + Log.info("The account check confirms the API key status. %s" % get_feed_guidance(rule_feed)) sys.exit(0) else: Log.error("Account is inactive: %s" % status) @@ -162,7 +216,7 @@ def main(): if args.lr or args.lh or args.lk or args.lkm: # Rule Lookup if args.lr != "": - if args.s: + if rule_feed == SIGMA_FEED: r = v.get_sigma_rule_info(args.lr) else: r = v.get_rule_info(args.lr) @@ -186,25 +240,35 @@ def main(): sys.exit(0) # Score warning - if args.fs == 0: + if rule_feed == YARA_FEED and args.fs == 0: Log.warning("Note that an unfiltered set (-fs 0) contains low scoring rules used for threat hunting purposes") # Info output - Log.info("Retrieving rules with params PRODUCT: %s MAX_VERSION: %s MODULES: %s WITH_CRYPTO: %s TAGS: %s " - "SCORE: %s PRIVATE_ONLY: %s QUERY: %s" % ( - args.fp, - args.fv, - ", ".join(modules), - str(args.nocrypto), - ", ".join(tags), - str(args.fs), - str(args.fpo), - args.fq - )) + Log.info("Selected rule feed: %s" % rule_feed.upper()) + if rule_feed == SIGMA_FEED: + ignored_flags = get_ignored_sigma_flags(args) + if ignored_flags: + Log.warning("Ignoring YARA-only flags for Sigma retrieval: %s" % ", ".join(ignored_flags)) + Log.info("Retrieving Sigma rules with params PRIVATE_ONLY: %s QUERY: %s" % ( + str(args.fpo), + args.fq + )) + else: + Log.info("Retrieving YARA rules with params PRODUCT: %s MAX_VERSION: %s MODULES: %s WITH_CRYPTO: %s TAGS: %s " + "SCORE: %s PRIVATE_ONLY: %s QUERY: %s" % ( + args.fp, + args.fv, + ", ".join(modules), + str(args.nocrypto), + ", ".join(tags), + str(args.fs), + str(args.fpo), + args.fq + )) # Retrieve rules try: - if args.s: + if rule_feed == SIGMA_FEED: response = v.get_sigma_rules_zip( search=args.fq, private_only=args.fpo, @@ -224,7 +288,7 @@ def main(): Log.error("Unknown product identifier - please use one of these: %s", ", ".join(ValhallaAPI.PRODUCT_IDENTIFIER)) sys.exit(1) except ApiError as e: - Log.error(e.message) + Log.error(get_feed_error_message(e.message, rule_feed)) sys.exit(1) # Response information @@ -235,11 +299,11 @@ def main(): # Tanium accepts only the ".yara" extension for imports if args.fp == "Tanium" and output_file == ValhallaAPI.DEFAULT_OUTPUT_FILE: output_file = "valhalla-rules.yara" - if args.s and output_file == ValhallaAPI.DEFAULT_OUTPUT_FILE: + if rule_feed == SIGMA_FEED and output_file == ValhallaAPI.DEFAULT_OUTPUT_FILE: output_file = "valhalla-rules.zip" # Write to the output file Log.info("Writing retrieved rules into: %s" % output_file) - if args.s: + if rule_feed == SIGMA_FEED: with open(output_file, 'wb') as fh: fh.write(response) else: