Checkmarx KICS
在本例中,您将创建一个 checkmarxScan
蓝图,该蓝图将使用 Port'sAPI 和webhook functionality 的组合在 Checkmarx KICS 文件中引用所有扫描结果。
要将扫描结果引用到 Port,可使用脚本根据 webhook 配置发送扫描信息。
先决条件
创建以下蓝图定义和 webhook 配置:
Checkmarx KICS blueprint
{
"identifier": "checkmarxScan",
"description": "This blueprint represents a Checkmarx KICS scan in our software catalog",
"title": "Checkmarx Scans",
"icon": "checkmarx",
"schema": {
"properties": {
"severity": {
"title": "Severity",
"type": "string",
"enum": ["LOW", "MEDIUM", "HIGH", "INFO"],
"enumColors": {
"LOW": "green",
"MEDIUM": "yellow",
"HIGH": "red",
"INFO": "yellow"
}
},
"url": {
"type": "string",
"title": "Scan URL",
"format": "url"
},
"platform": {
"title": "Platform",
"type": "string"
},
"files": {
"items": {
"type": "object"
},
"title": "Files",
"type": "array"
},
"cloud_provider": {
"title": "Cloud Provider",
"type": "string"
},
"description": {
"title": "Description",
"type": "string"
},
"category": {
"title": "Category",
"type": "string"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {}
}
Checkmarx KICS webhook configuration
{
"identifier": "checkmarxMapper",
"title": "Checkmarx Mapper",
"description": "A webhook configuration to ingest Checkmarx KICS scans from a file",
"icon": "checkmarx",
"mappings": [
{
"blueprint": "checkmarxScan",
"itemsToParse": ".body.scans",
"entity": {
"identifier": ".item.query_id",
"title": ".item.query_name",
"properties": {
"severity": ".item.severity",
"url": ".item.query_url",
"platform": ".item.platform",
"files": ".item.files",
"cloud_provider": ".item.cloud_provider",
"description": ".item.description",
"category": ".item.category"
}
}
}
],
"enabled": true,
"security": {}
}
使用 Port 的 API 和 Python 脚本
下面的示例片段展示了如何使用 Python 将 Port 的 API 和 webhook 与现有的 Pipelines 集成:
Python script example
## Import the needed libraries
import requests
import os
import json
WEBHOOK_URL = os.environ["WEBHOOK_URL"] # The URL for the webhook endpoint provided by Port
PATH_TO_CHECKMARX_JSON_FILE = os.environ["PATH_TO_CHECKMARX_JSON_FILE"] # The path to the checkmarx result.json file relative to the project folder
def add_entity_to_port(entity_object):
"""A function to create the passed entity in Port using the webhook URL
Params
--------------
entity_object: dict
The entity to add in your Port catalog
Returns
--------------
response: dict
The response object after calling the webhook
"""
headers = {"Accept": "application/json"}
response = requests.post(WEBHOOK_URL, json=entity_object, headers=headers)
return response.json()
def read_checkmarx_file(checkmarx_json_path):
"""This function takes a Checkmarx KICS result.json file path, extracts the necessary information into a
JSON array and then sends the data to Port
Params
--------------
checkmarx_json_path: str
The path to the result.json file relative to the project's root folder
Returns
--------------
response: dict
The response object after calling the webhook
"""
with open(checkmarx_json_path) as file:
data = json.load(file)
scan_result = []
queries = data.get("queries", [])
for query in queries:
query_name = query.get("query_name")
query_id = query.get("query_id")
severity = query.get("severity")
platform = query.get("platform")
query_url = query.get("query_url")
cloud_provider = query.get("cloud_provider")
category = query.get("category")
description = query.get("description")
files = [{"file_name": file.get("file_name"), "issue_type": file.get("issue_type")} for file in query.get("files", [])]
scan_result.append({
"query_name": query_name,
"query_id": query_id,
"severity": severity,
"platform": platform,
"query_url": query_url,
"cloud_provider": cloud_provider,
"category": category,
"description": description,
"files": files
})
entity_object = {
"scans": scan_result
}
webhook_response = add_entity_to_port(entity_object)
return webhook_response
response_data = read_checkmarx_file(PATH_TO_CHECKMARX_JSON_FILE)
print(response_data)