Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@

# ignore cloud credentials
/bot/cloud-credentials.json

__pycache__/
57 changes: 57 additions & 0 deletions cve-jira-processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# CVE Jira Processing

Tools for detecting and handling duplicate CVE issues in Jira.

## Setup

1. Install dependencies:
```bash
pip install -r requirements.txt
```

2. Set environment variables:
```bash
export JIRA_API_TOKEN="your-api-token"
export JIRA_SERVER="https://issues.redhat.com" # optional, this is the default
```

## Usage

### Detect and Process Duplicate CVEs

```bash
# Dry run - see what would be done without making changes
python dup_cve.py bugs.txt --dry-run

# Process duplicates for real
python dup_cve.py bugs.txt

# Verbose output for debugging
python dup_cve.py bugs.txt --dry-run -v
```

### Input File Format

The input file should contain one Jira issue key per line:
```
OCPBUGS-12345
OCPBUGS-12346
OCPBUGS-12347
```

## What It Does

1. Reads a list of CVE-related bug IDs from a file
2. Fetches issue details from Jira
3. Groups issues by component, version, and CVE ID to detect duplicates
4. For each duplicate group:
- Creates a tracking bug with the target version set to the next minor release
- Links the main issue to the tracking bug
- Marks duplicate issues as duplicates of the main issue
- Closes duplicates with resolution "Duplicate"

## Files

- `dup_cve.py` - Main script for duplicate detection and processing
- `jira_client.py` - Jira API client wrapper
- `jira_formatter.py` - Field formatting utilities for Jira API requests
222 changes: 222 additions & 0 deletions cve-jira-processing/dup_cve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""CVE duplicate detection and handling for Jira issues."""

import argparse
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List

from jira_client import JiraTool


logger = logging.getLogger(__name__)


REPO_TO_COMPONENT = {
# cloud-provider-openstack
'openshift4/ose-openstack-cloud-controller-manager-rhel9': 'openshift/cloud-provider-openstack',
'openshift4/ose-openstack-cinder-csi-driver-rhel9': 'openshift/cloud-provider-openstack',
'openshift4/ose-openstack-cinder-csi-driver-rhel8': 'openshift/cloud-provider-openstack',
'openshift4/ose-csi-driver-manila-rhel8': 'openshift/cloud-provider-openstack',
'openshift4/ose-csi-driver-manila-rhel9': 'openshift/cloud-provider-openstack',
# csi-driver-manila-operator
'openshift4/ose-csi-driver-manila-rhel9-operator': 'openshift/csi-driver-manila-operator',
'openshift4/ose-csi-driver-manila-rhel8-operator': 'openshift/csi-driver-manila-operator',
# openstack-cinder-csi-driver-operator
'openshift4/ose-openstack-cinder-csi-driver-rhel9-operator': 'openshift/openstack-cinder-csi-driver-operator',
'openshift4/ose-openstack-cinder-csi-driver-rhel8-operator': 'openshift/openstack-cinder-csi-driver-operator',
}


@dataclass
class ComplexBug:
key: str
components: list
affected_version: list


@dataclass
class ProcessedGroup:
tracking_bug: str
main_issue: str
duplicates_closed: List[str]
target_version: str


def get_next_version(version: str) -> str:
"""
Calculate the next version after the given version.

Examples:
"4.14" -> "4.15"
"4.9" -> "4.10"
"""
parts = version.split(".")
if len(parts) >= 2:
try:
major = parts[0]
minor = int(parts[1])
return f"{major}.{minor + 1}"
except ValueError:
logger.warning("Could not parse version: %s", version)
return version
return version


def detect_duplicates(issue_map: Dict) -> Dict[str, List[ComplexBug]]:
"""
Group issues by CVE, component, and version to detect duplicates.

Returns a dict where keys are "component:version:cve_id" and values
are lists of ComplexBug instances that share those attributes.
"""
grouped = defaultdict(list)

for bug, issue in issue_map.items():
downstream_component = issue.get('Downstream Component Name')
if downstream_component not in REPO_TO_COMPONENT:
logger.warning("Skipping %s: unknown component %s", bug, downstream_component)
continue

component = REPO_TO_COMPONENT[downstream_component]
version = issue["Affects Version/s"][0]["name"]
cve_id = issue["CVE ID"]

key = f'{component}:{version}:{cve_id}'
grouped[key].append(ComplexBug(
key=bug,
components=issue["Component/s"],
affected_version=issue["Affects Version/s"]
))

return dict(grouped)


def process_duplicates(client: JiraTool, dups: Dict[str, List[ComplexBug]], dry_run: bool = False) -> List[ProcessedGroup]:
"""Process duplicate issues: create tracking bug and close duplicates."""
results = []

for group_key, bugs in dups.items():
if len(bugs) < 2:
logger.debug("Skipping group %s: only %d issue(s)", group_key, len(bugs))
continue

main_issue = bugs[0]
duplicates = [b.key for b in bugs[1:]]
logger.info("Processing group: %s", group_key)
logger.info(" Main issue: %s", main_issue.key)
logger.info(" Duplicates: %s", duplicates)

affects_version = main_issue.affected_version[0]["name"]
target_version = get_next_version(affects_version)

if dry_run:
logger.info(" [DRY RUN] Would create tracking bug (target: %s) and close duplicates", target_version)
results.append(ProcessedGroup(
tracking_bug="[DRY RUN]",
main_issue=main_issue.key,
duplicates_closed=duplicates,
target_version=target_version,
))
continue

logger.debug(" Affects version: %s, Target version: %s", affects_version, target_version)

new_issue = client.create_jira_issue(
{
"project": "OCPBUGS",
"summary": f"Tracking bug for {main_issue.key} - {group_key}",
"component/s": [x["name"] for x in main_issue.components],
"affects version/s": [x["name"] for x in main_issue.affected_version],
"target version": [target_version],
},
"Bug"
)
logger.info(" Created tracking issue: %s", new_issue.key)

client.link_issue('is blocked by', main_issue.key, new_issue.key)

for dup in bugs[1:]:
client.link_issue('duplicates', main_issue.key, dup.key)
client.transition_issue_status(dup.key, "closed", "Duplicate")
logger.info(" Closed duplicate: %s", dup.key)

results.append(ProcessedGroup(
tracking_bug=new_issue.key,
main_issue=main_issue.key,
duplicates_closed=duplicates,
target_version=target_version,
))

return results


def print_summary(results: List[ProcessedGroup]):
"""Print a summary of all processed groups."""
if not results:
logger.info("No duplicate groups were processed")
return

total_duplicates = sum(len(r.duplicates_closed) for r in results)

logger.info("")
logger.info("=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)
logger.info("Tracking bugs created: %d", len(results))
logger.info("Duplicates closed: %d", total_duplicates)
logger.info("")

for result in results:
logger.info(" %s (target: %s)", result.tracking_bug, result.target_version)
logger.info(" Main issue: %s", result.main_issue)
logger.info(" Closed: %s", ", ".join(result.duplicates_closed))

logger.info("=" * 60)


def setup_logging(verbose: bool = False):
"""Configure logging for the application."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)


def main():
parser = argparse.ArgumentParser(description="Detect and handle duplicate CVE issues in Jira")
parser.add_argument("input_file", help="File containing bug IDs (one per line)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose/debug logging")
args = parser.parse_args()

setup_logging(args.verbose)

with open(args.input_file, 'r') as f:
bugs = [line.strip() for line in f if line.strip()]

logger.info("Read %d bug IDs from %s", len(bugs), args.input_file)

client = JiraTool()

logger.info("Fetching %d issues...", len(bugs))
issues = {}
for bug in bugs:
issues[bug] = client.get_jira_issue(
bug,
field_filter=["Affects Version/s", "CVE ID", "Downstream Component Name", "Component/s"]
)

dups = detect_duplicates(issues)
logger.info("Found %d duplicate groups", len(dups))

results = process_duplicates(client, dups, dry_run=args.dry_run)
print_summary(results)
logger.info("Done")


if __name__ == "__main__":
main()
Loading