Trivy
在本示例中,您将创建一个 trivyVulnerability
蓝图,使用 Port'sAPI 和webhook functionality 的组合来引用 Trivy 结果文件中的所有漏洞。
推荐安装选项 虽然本示例中提供的脚本便于按计划将 Trivy 扫描结果摄取到 Port,但我们强烈建议您use our Trivy Kubernetes exporter ,以持续扫描 kubernetes 集群并实时将漏洞摄取到 Port。
要将扫描结果引用到 Port,可使用一个脚本,根据 webhook 配置发送有关漏洞的信息。
先决条件
创建以下蓝图定义和 webhook 配置:
Trivy vulnerability blueprint
{
"identifier": "trivyVulnerability",
"description": "This blueprint represents a Trivy vulnerability in our software catalog",
"title": "Trivy Vulnerability",
"icon": "Trivy",
"schema": {
"properties": {
"version": {
"title": "Version",
"type": "string"
},
"package_name": {
"title": "Package Name",
"type": "string"
},
"url": {
"title": "Primary URL",
"type": "string",
"format": "url"
},
"description": {
"title": "Description",
"type": "string"
},
"target": {
"title": "Target",
"type": "string"
},
"severity": {
"title": "Severity",
"type": "string",
"default": "HIGH",
"enum": ["HIGH", "MEDIUM", "LOW", "CRITICAL", "UNKNOWN"],
"enumColors": {
"HIGH": "red",
"MEDIUM": "yellow",
"LOW": "green",
"CRITICAL": "red",
"UNKNOWN": "purple"
}
},
"data_source": {
"title": "Data Source",
"type": "object"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {}
}
Trivy webhook configuration
{
"identifier": "trivyVulnerabilityMapper",
"title": "Trivy Vulnerability Mapper",
"description": "A webhook configuration to ingest Trivy vulnerabilities from a file",
"icon": "Trivy",
"mappings": [
{
"blueprint": "trivyVulnerability",
"itemsToParse": ".body.vulnerabilities",
"entity": {
"identifier": ".item.id",
"title": ".item.title",
"properties": {
"version": ".item.version",
"package_name": ".item.pkgName",
"url": ".item.primaryUrl",
"description": ".item.description",
"target": ".item.target",
"severity": ".item.severity",
"data_source": ".item.dataSource"
}
}
}
],
"enabled": true,
"security": {}
}
使用 Port 的 API 和 Python 脚本
下面的示例片段展示了如何使用 Python 将 Port 的 API 和 webhook 与现有的 Pipelines 集成:
Python script example
## Import the needed libraries
import requests
import os
import json
import re
WEBHOOK_URL = os.environ["WEBHOOK_URL"] # The URL for the webhook endpoint provided by Port
PATH_TO_TRIVY_JSON_FILE = os.environ["PATH_TO_TRIVY_JSON_FILE"] # The path to the Trivy result.json file relative to the project folder
def add_entity_to_port(entity_object):
"""A function to create the passed entity in Port using the webhook URL
Params
--------------
entity_object: dict
The entity to add in your Port catalog
Returns
--------------
response: dict
The response object after calling the webhook
"""
headers = {"Accept": "application/json"}
response = requests.post(WEBHOOK_URL, json=entity_object, headers=headers)
return response.json()
def remove_invalid_chars(input_str):
"""A helper function to remove characters that don't meet Port's entity identifier pattern"""
pattern = r"[^A-Za-z0-9@_.:\\/=-]"
clean_identifier = re.sub(pattern, '', input_str)
return clean_identifier
def extract_trivy_scan_data(trivy_file):
with open(trivy_file, 'r') as file:
data = json.load(file)
trivy_results = data.get("Results", [])
trivy_vulnerabilities = []
for result in trivy_results:
target = result.get("Target")
vulnerabilities = result.get("Vulnerabilities", [])
for vulnerability in vulnerabilities:
vulnerability_data = {
"id": remove_invalid_chars(vulnerability.get("VulnerabilityID", "") + "-" + vulnerability.get("PkgName", "")),
"pkgName": vulnerability.get("PkgName", ""),
"version": vulnerability.get("InstalledVersion", ""),
"title": vulnerability.get("Title", ""),
"description": vulnerability.get("Description", ""),
"severity": vulnerability.get("Severity", ""),
"primaryUrl": vulnerability.get("PrimaryURL"),
"dataSource": vulnerability.get("DataSource", {}),
"target": target
}
trivy_vulnerabilities.append(vulnerability_data)
entity_object = {
"vulnerabilities": trivy_vulnerabilities,
}
webhook_response = add_entity_to_port(entity_object)
return webhook_response
trivy_file = PATH_TO_TRIVY_JSON_FILE
response_data = extract_trivy_scan_data(trivy_file)
print(response_data)