Skip to content
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
from vulnerabilities.pipelines.v2_importers import ruby_importer as ruby_importer_v2
from vulnerabilities.pipelines.v2_importers import tuxcare_importer as tuxcare_importer_v2
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
from vulnerabilities.utils import create_registry
Expand Down Expand Up @@ -98,6 +99,7 @@
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
tuxcare_importer_v2.TuxCareImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
221 changes: 221 additions & 0 deletions vulnerabilities/pipelines/v2_importers/tuxcare_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
from typing import Iterable

from dateutil.parser import parse
from packageurl import PackageURL
from pytz import UTC
from univers.version_range import AlpineLinuxVersionRange
from univers.version_range import DebianVersionRange
from univers.version_range import GenericVersionRange
from univers.version_range import RpmVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import GENERIC
from vulnerabilities.utils import fetch_response

logger = logging.getLogger(__name__)

# See https://docs.tuxcare.com/els-for-os/#cve-status-definition
NON_AFFECTED_STATUSES = ["Not Vulnerable"]
AFFECTED_STATUSES = ["Ignored", "Needs Triage", "In Testing", "In Progress", "In Rollout"]
FIXED_STATUSES = ["Released", "Already Fixed"]

VERSION_RANGE_BY_PURL_TYPE = {
"rpm": RpmVersionRange,
"deb": DebianVersionRange,
"apk": AlpineLinuxVersionRange,
"generic": GenericVersionRange,
}


class TuxCareImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
pipeline_id = "tuxcare_importer_v2"
spdx_license_expression = "Apache-2.0"
license_url = "https://tuxcare.com/legal"

@classmethod
def steps(cls):
return (
cls.fetch,
cls.collect_and_store_advisories,
)

def fetch(self) -> None:
url = "https://cve.tuxcare.com/els/download-json?orderBy=updated-desc"
self.log(f"Fetching `{url}`")
response = fetch_response(url)
self.response = response.json() if response else []
self._grouped = self._group_records_by_cve()

def _group_records_by_cve(self) -> dict:
grouped = {}
skipped_invalid = 0
skipped_non_affected = 0

for record in self.response:
cve_id = record.get("cve", "").strip()
if not cve_id or not cve_id.startswith("CVE-"):
logger.warning(f"Skipping invalid CVE ID: {cve_id}")
skipped_invalid += 1
continue

os_name = record.get("os_name", "").strip()
project_name = record.get("project_name", "").strip()
version = record.get("version", "").strip()
status = record.get("status", "").strip()

if not all([os_name, project_name, version, status]):
logger.warning(f"Skipping {cve_id}: missing required fields")
skipped_invalid += 1
continue

# Skip records with non-affected statuses
if status in NON_AFFECTED_STATUSES:
skipped_non_affected += 1
continue

if status not in AFFECTED_STATUSES and status not in FIXED_STATUSES:
logger.warning(f"Skipping {cve_id}: unrecognized status '{status}'")
skipped_invalid += 1
continue

if cve_id not in grouped:
grouped[cve_id] = []
grouped[cve_id].append(record)

total_skipped = skipped_invalid + skipped_non_affected
self.log(
f"Grouped {len(self.response):,d} records into {len(grouped):,d} unique CVEs "
f"(skipped {total_skipped:,d}: {skipped_invalid:,d} invalid, "
f"{skipped_non_affected:,d} non-affected)"
)
return grouped

def advisories_count(self) -> int:
return len(self._grouped)

def _create_purl(self, project_name: str, os_name: str) -> PackageURL:
normalized_os = os_name.lower().replace(" ", "-")
os_lower = os_name.lower()

os_mapping = {
"ubuntu": ("deb", "ubuntu"),
"debian": ("deb", "debian"),
"centos": ("rpm", "centos"),
"almalinux": ("rpm", "almalinux"),
"rhel": ("rpm", "rhel"),
"oracle": ("rpm", "oracle"),
"cloudlinux": ("rpm", "cloudlinux"),
"alpine": ("apk", "alpine"),
"unknown": ("generic", "tuxcare"),
"tuxcare": ("generic", "tuxcare"),
}

for keyword, (ptype, pns) in os_mapping.items():
if keyword in os_lower:
pkg_type = ptype
namespace = pns
break
else:
return None

qualifiers = {"distro": normalized_os}

return PackageURL(
type=pkg_type, namespace=namespace, name=project_name, qualifiers=qualifiers
)

def collect_advisories(self) -> Iterable[AdvisoryData]:
grouped_by_cve = self._grouped

for cve_id, records in grouped_by_cve.items():
affected_packages = []
severities = []
date_published = None
all_records = []
severity_added = False

for record in records:
os_name = record.get("os_name", "").strip()
project_name = record.get("project_name", "").strip()
version = record.get("version", "").strip()
score = record.get("score", "").strip()
severity = record.get("severity", "").strip()
status = record.get("status", "").strip()
last_updated = record.get("last_updated", "").strip()

purl = self._create_purl(project_name, os_name)
if not purl:
logger.warning(
f"Skipping package {project_name} on {os_name} for {cve_id} - unexpected OS type"
)
continue

version_range_class = VERSION_RANGE_BY_PURL_TYPE.get(purl.type, GenericVersionRange)
try:
version_range = version_range_class.from_versions([version])
except ValueError as e:
logger.warning(f"Failed to parse version {version} for {cve_id}: {e}")
continue

affected_version_range = None
fixed_version_range = None

if status in AFFECTED_STATUSES:
affected_version_range = version_range
elif status in FIXED_STATUSES:
fixed_version_range = version_range

affected_packages.append(
AffectedPackageV2(
package=purl,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
)

if severity and score and not severity_added:
severities.append(
VulnerabilitySeverity(
system=GENERIC,
value=score,
scoring_elements=severity,
)
)
severity_added = True

if last_updated:
try:
current_date = parse(last_updated).replace(tzinfo=UTC)
if date_published is None or current_date > date_published:
date_published = current_date
except ValueError as e:
logger.warning(f"Failed to parse date {last_updated} for {cve_id}: {e}")

all_records.append(record)

if not affected_packages:
logger.warning(f"Skipping {cve_id} - no valid affected packages")
continue

yield AdvisoryData(
advisory_id=cve_id,
affected_packages=affected_packages,
severities=severities,
date_published=date_published,
url=f"https://cve.tuxcare.com/els/cve/{cve_id}",
original_advisory_text=json.dumps(all_records, indent=2, ensure_ascii=False),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import patch

from vulnerabilities.pipelines.v2_importers.tuxcare_importer import TuxCareImporterPipeline
from vulnerabilities.tests import util_tests

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "tuxcare"


class TestTuxCareImporterPipeline(TestCase):
@patch("vulnerabilities.pipelines.v2_importers.tuxcare_importer.fetch_response")
def test_collect_advisories(self, mock_fetch):
sample_path = TEST_DATA / "data.json"
sample_data = json.loads(sample_path.read_text(encoding="utf-8"))

mock_fetch.return_value = Mock(json=lambda: sample_data)

pipeline = TuxCareImporterPipeline()
pipeline.fetch()

advisories = [data.to_dict() for data in list(pipeline.collect_advisories())]

expected_file = TEST_DATA / "expected.json"
util_tests.check_results_against_json(advisories, expected_file)

assert len(advisories) == 14
Loading