Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions vulnerabilities/pipelines/nvd_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def advisories_count(self):
return advisory_count

def collect_advisories(self) -> Iterable[AdvisoryData]:
for _year, cve_data in fetch_cve_data_1_1(logger=self.log):
for _year, cve_data in fetch_cve_data_2_0(logger=self.log):
yield from to_advisories(cve_data=cve_data)


Expand All @@ -107,15 +107,15 @@ def fetch(url, logger=None):
return json.loads(data)


def fetch_cve_data_1_1(starting_year=2002, logger=None):
def fetch_cve_data_2_0(starting_year=2002, logger=None):
"""
Yield tuples of (year, lists of CVE mappings) from the NVD, one for each
year since ``starting_year`` defaulting to 2002.
"""
current_year = date.today().year
# NVD json feeds start from 2002.
for year in range(starting_year, current_year + 1):
download_url = f"https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
download_url = f"https://nvd.nist.gov/feeds/json/cve/2.0/nvdcve-2.0-{year}.json.gz"
yield year, fetch(url=download_url, logger=logger)


Expand All @@ -134,20 +134,22 @@ class CveItem:
cve_item = attr.attrib(default=attr.Factory(dict), type=dict)

@classmethod
def to_advisories(cls, cve_data, skip_hardware=True):
def to_advisories(cls, vulnerabilities, skip_hardware=True):
"""
Yield AdvisoryData objects from ``cve_data`` data for CVE JSON 1.1feed.
Skip hardware
"""
for cve_item in CveItem.from_cve_data(cve_data=cve_data, skip_hardware=skip_hardware):
for cve_item in CveItem.from_cve_data(
vulnerabilities=vulnerabilities, skip_hardware=skip_hardware
):
yield cve_item.to_advisory()

@classmethod
def from_cve_data(cls, cve_data, skip_hardware=True):
"""
Yield CVE items mapping from a cve_data list of CVE mappings from the NVD.
"""
for cve_item in cve_data.get("CVE_Items") or []:
for cve_item in cve_data.get("vulnerabilities") or []:
if not cve_item:
continue
if not isinstance(cve_item, dict):
Expand All @@ -159,20 +161,20 @@ def from_cve_data(cls, cve_data, skip_hardware=True):

@property
def cve_id(self):
return self.cve_item["cve"]["CVE_data_meta"]["ID"]
return self.cve_item["cve"]["id"]

@property
def summary(self):
"""
Return a descriptive summary.
"""
# In 99% of cases len(cve_item['cve']['description']['description_data']) == 1 , so
# this usually returns cve_item['cve']['description']['description_data'][0]['value']
# In 99% of cases len(cve_item['cve']['description']) == 1 , so
# this usually returns cve_item['cve']['description'][0]['value']
# In the remaining 1% cases this returns the longest summary.
# FIXME: we should retun the full description WITH the summry as the first line instead
# FIXME: we should return the full description WITH the summary as the first line instead
summaries = []
for desc in get_item(self.cve_item, "cve", "description", "description_data") or []:
if desc.get("value"):
for desc in get_item(self.cve_item, "cve", "descriptions") or []:
if desc.get("value") and desc.get("lang") == "en":
summaries.append(desc["value"])
return max(summaries, key=len) if summaries else None

Expand All @@ -183,11 +185,12 @@ def cpes(self):
"""
# FIXME: we completely ignore the configurations here
cpes = []
for node in get_item(self.cve_item, "configurations", "nodes") or []:
for cpe_data in node.get("cpe_match") or []:
cpe23_uri = cpe_data.get("cpe23Uri")
if cpe23_uri and cpe23_uri not in cpes:
cpes.append(cpe23_uri)
for nodes in get_item(self.cve_item, "cve", "configurations") or []:
for node in nodes.get("nodes") or []:
for cpe_data in node.get("cpeMatch") or []:
cpe23_uri = cpe_data.get("criteria")
if cpe23_uri and cpe23_uri not in cpes:
cpes.append(cpe23_uri)
return cpes

@property
Expand Down Expand Up @@ -243,7 +246,7 @@ def reference_urls(self):
# FIXME: we should also collect additional data from the references such as tags and ids

urls = []
for reference in get_item(self.cve_item, "cve", "references", "reference_data") or []:
for reference in get_item(self.cve_item, "cve", "references") or []:
ref_url = reference.get("url")
if ref_url and ref_url.startswith(("http", "ftp")) and ref_url not in urls:
urls.append(ref_url)
Expand Down Expand Up @@ -294,9 +297,7 @@ def weaknesses(self):
Return a list of CWE IDs like: [119, 189]
"""
weaknesses = []
for weaknesses_item in (
get_item(self.cve_item, "cve", "problemtype", "problemtype_data") or []
):
for weaknesses_item in get_item(self.cve_item, "cve", "weaknesses") or []:
weaknesses_description = weaknesses_item.get("description") or []
for weaknesses_value in weaknesses_description:
cwe_id = (
Expand All @@ -315,7 +316,7 @@ def to_advisory(self):
aliases=[self.cve_id],
summary=self.summary,
references=self.references,
date_published=dateparser.parse(self.cve_item.get("publishedDate")),
date_published=dateparser.parse(self.cve_item["cve"].get("published")),
weaknesses=self.weaknesses,
url=f"https://nvd.nist.gov/vuln/detail/{self.cve_id}",
)
Expand Down
Loading