Skip to main content

使用 AWS Lambda 设置基本的更改日志监听器

在本指南中,我们将展示如何部署一个新的 "AWS Lambda 函数",该函数将订阅 "change.logging "主题,并对 Port 报告的基础架构中的变更做出反应。

先决条件

要仿效此示例,请通过 Intercom 联系我们,以获得专门的 Kafka 主题。
  • AWS CLI 已安装并配置到所需的 AWS 帐户;
  • Port API CLIENT_IDCLIENT_SECRET
  • Port 提供给您的 Kafka 主题的连接凭证将如下所示:
KAFKA_BROKERS=b-1-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196,b-2-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196,b-3-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196
KAFKA_RUNS_TOPIC=YOUR_ORG_ID.change.log
KAFKA_AUTHENTICATION_MECHANISM=scram-sha-512
KAFKA_ENABLE_SSL=true
KAFKA_USERNAME=YOUR_KAFKA_USERNAME
KAFKA_PASSWORD=YOUR_KAFKA_PASSWORD
KAFKA_CONSUMER_GROUP_NAME=YOUR_KAFKA_CONSUMER_GROUP

为了快速上手,您可以随时查看code repository 以了解示例。

在本示例中,与 Port 的交互主要通过应用程序接口进行,但也可以通过网络用户界面进行。

场景

每当可用存储空间过低时,创建一个新的执行函数,释放或扩展虚拟机中的存储空间。 让我们来了解一下如何做到这一点,以及 Port 的更新日志功能有哪些。

创建虚拟机蓝图

让我们配置一个 VM 蓝图,它的基本结构是

{
"identifier": "vm",
"title": "VM",
"icon": "Server",
"schema": {
"properties": {
"region": {
"title": "Region",
"type": "string",
"description": "Region of the VM"
},
"cpu_cores": {
"title": "CPU Cores",
"type": "number",
"description": "Number of allocated CPU cores"
},
"memory_size": {
"title": "Memory Size ",
"type": "number",
"description": "Amount of allocated memory (GB)"
},
"storage_size": {
"title": "Storage Size",
"type": "number",
"description": "Amount of allocated storage (GB)"
},
"free_storage": {
"title": "Free Storage",
"type": "number",
"description": "Amount of free storage (GB)"
},
"deployed": {
"title": "Deploy Status",
"type": "string",
"description": "The deployment status of this VM"
}
},
"required": []
},
"changelogDestination": {
"type": "KAFKA"
},
"calculationProperties": {}
}

下面是创建该蓝图的 "python "代码(切记插入 "CLIENT_ID "和 "CLIENT_SECRET "以获取访问令牌)

Click here to see the code
import requests

CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'

API_URL = 'https://api.getport.io/v1'

credentials = {'clientId': CLIENT_ID, 'clientSecret': CLIENT_SECRET}

token_response = requests.post(f'{API_URL}/auth/access_token', json=credentials)

access_token = token_response.json()['accessToken']

headers = {
'Authorization': f'Bearer {access_token}'
}

blueprint = {
"identifier": "vm",
"title": "VM",
"icon": "Server",
"schema": {
"properties": {
"region": {
"title": "Region",
"type": "string",
"description": "Region of the VM"
},
"cpu_cores": {
"title": "CPU Cores",
"type": "number",
"description": "Number of allocated CPU cores"
},
"memory_size": {
"title": "Memory Size ",
"type": "number",
"description": "Amount of allocated memory (GB)"
},
"storage_size": {
"title": "Storage Size",
"type": "number",
"description": "Amount of allocated storage (GB)"
},
"free_storage": {
"title": "Free Storage",
"type": "number",
"description": "Amount of free storage"
},
"deployed": {
"title": "Deploy Status",
"type": "string",
"description": "The deployment status of this VM"
}
},
"required": []
},
"changelogDestination": {
"type": "KAFKA"
},
"calculationProperties": {},

}

response = requests.post(f'{API_URL}/blueprints', json=blueprint, headers=headers)

print(response.json())

现在您有了虚拟机的蓝图,需要部署 "AWS Lambda "监听器,它将在出现问题时修复虚拟机上的存储。

设置 AWS 资源

在本例中,您将部署一个用 python 编写的 AWS Lambda 函数。

**AWS 设置需要以下资源: **

  • 存储在 Secrets Manager 中的带有 Kafka 身份验证凭据的secret。
  • 可访问新secret的 AWS Lambda 执行角色。
  • 一个 AWS Lambda 层,用于我们的额外 python 库。
  • 将 AWS Lambda 配置为示例 python 代码、代码层和您创建的执行角色。配置 Kafka 触发器

为 Lambda 创建secret

Lambda 函数将使用在 AWS Secret Manager 中配置的 "secret "来与 Port 提供的个人 Kafka 主题进行身份验证,让我们继续在 AWS CLI 中创建该 secret:

# Remember to replace YOUR_KAFKA_USERNAME and YOUR_KAFKA_PASSWORD with the real username and password provided to you by Port
# You can change the secret name to any name that works for you
aws secretsmanager create-secret --name "PortKafkaAuthCredentials" --secret-string '{"username":"YOUR_KAFKA_USERNAME", "password":"YOUR_KAFKA_PASSWORD"}'

您应该会看到类似下面的 Output:

{
"ARN": "arn:aws:secretsmanager:eu-west-1:1111111111:secret:PortKafkaAuthCredentials-aaaaaa",
"Name": "PortKafkaAuthCredentials",
"VersionId": "aaaaa00a-00aa-0000-00a0-00000aa00a0a"
}
保存 ARN 确保保存 ARN 值,您将需要它来为 Lambda 函数创建一个执行角色,该角色可以访问新创建的 secret。

创建执行角色

在部署 Lambda 函数之前,它需要一个可以访问您创建的 Kafka 用户名和密码secret的execution role 。让我们创建一个具有assumeRole权限和cloudWatch基本权限的basic execution role

aws iam create-role --role-name lambda-port-execution-role --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{ "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]}'

您应该会看到类似下面的 Output:

{
"Role": {
"Path": "/",
"RoleName": "lambda-port-execution-role",
"RoleId": "AROAQFOXMPL6TZ6ITKWND",
"Arn": "arn:aws:iam::123456789012:role/lambda-port-execution-role",
"CreateDate": "2020-01-17T23:19:12Z",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds"
],
"Resource": [
"arn:aws:secretsmanager:eu-west-1:1111111111:secret:PortKafkaAuthCredentials-aaaaaa"
]
},
{
"Effect": "Allow",
"Action": "secretsmanager:ListSecrets",
"Resource": "*"
}
]
}
}
}
保存 ARN 再次,确保保存 Arn 值,您将在部署 Lambda 函数时使用它

让我们用以下命令为该角色附加基本的 Lambda 执行权限:

aws iam attach-role-policy --role-name lambda-port-execution-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

现在,让我们添加以下策略(更多信息请参阅AWS document ) ,我们将创建一个名为 execution-policy.json 的文件,并粘贴以下内容:

切记将 "资源 "下的 "ARN "值替换为创建 secret 时输出的 "ARN "值。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds"
],
"Resource": [
"arn:aws:secretsmanager:eu-west-1:1111111111:secret:PortKafkaAuthCredentials-aaaaaa"
]
},
{
"Effect": "Allow",
"Action": "secretsmanager:ListSecrets",
"Resource": "*"
}
]
}

现在让我们更新执行角色(假设 execution-policy.json 文件与运行命令的终端位于同一目录):

aws iam put-role-policy --role-name lambda-port-execution-role --policy-name managed-kafka-secret-access-policy --policy-document file://execution-policy.json

创建 AWS Lambda 层

现在,让我们创建一个Lambda Layer ,其中将包含 Lambda 函数将使用的额外库。

Lambda 只需要requests 库,但下面的示例还包括一些日志输出的jsonpickle ,以使 Lambda 日志更加冗长,在开始修改代码时更容易理解。

现在,让我们运行所有命令来创建层 zip 并将其部署到 AWS(请务必指定希望层和 lambda 可用的区域):

# Create layer directory and specify requests as a required library
mkdir lambda_layer
cd lambda_layer
echo requests==2.28.1 > requirements.txt
echo jsonpickle==2.2.0 >> requirements.txt

# Create layer based on requirements.txt in python/ directory
pip install -r requirements.txt --platform manylinux2014_x86_64 --target ./python --only-binary=:all:
# Create a zip of the layer
zip -r layer.zip python
# Upload a new layer version to AWS
aws lambda publish-layer-version --layer-name lambda_port_execution_package_layer --description "Python pacakges layer for lambda Port execution example" --compatible-runtimes python3.6 python3.7 python3.8 python3.9 --zip-file fileb://layer.zip --region eu-west-1

您应该会看到类似下面的 Output:

{
"Content": {
"Location": "https://awslambda-eu-west-1-layers.s3.eu-west-1.amazonaws.com/snapshots/123456789012/my-layer-4aaa2fbb-ff77-4b0a-ad92-5b78a716a96a?versionId=27iWyA73cCAYqyH...",
"CodeSha256": "tv9jJO+rPbXUUXuRKi7CwHzKtLDkDRJLB3cC3Z/ouXo=",
"CodeSize": 169
},
"LayerArn": "arn:aws:lambda:eu-west-1:123456789012:layer:lambda_port_execution_package_layer",
"LayerVersionArn": "arn:aws:lambda:eu-west-1:123456789012:layer:lambda_port_execution_package_layer:1",
"Description": "Python pacakges layer for lambda Port execution example",
"CreatedDate": "2018-11-14T23:03:52.894+0000",
"Version": 1,
"CompatibleArchitectures": ["x86_64"],
"LicenseInfo": "MIT",
"CompatibleRuntimes": ["python3.6", "python3.7", "python3.8", "python3.9"]
}
再次确保保存 LayerVersionArn 值,您将用它来部署您的 Lambda 函数

创建 Lambda 函数

现在,您可以创建 Lambda 函数,初始函数非常基本,并在实际执行运行逻辑的位置有特定注释。

Click here to see the function code
# file: lambda_function.py
# lambda entrypoint: lambda_handler

import base64
import os
import logging
from typing import Union
import jsonpickle
import json
import requests
import traceback

logger = logging.getLogger()
logger.setLevel(logging.INFO)

CLIENT_ID = os.environ['PORT_CLIENT_ID']
CLIENT_SECRET = os.environ['PORT_CLIENT_SECRET']

CREATE_TRIGGER = 'CREATE'

API_URL = 'https://api.getport.io/v1'

def get_port_api_token():
'''
Get a Port API access token

This function uses a global ``CLIENT_ID`` and ``CLIENT_SECRET``
'''
credentials = {'clientId': CLIENT_ID, 'clientSecret': CLIENT_SECRET}

token_response = requests.post(f'{API_URL}/auth/access_token', json=credentials)
access_token = token_response.json()['accessToken']

return access_token

def update_entity_prop_value(blueprint_identifier: str, identifier: str, property_name: str, property_value: Union[str, int]):
'''
Patches a Port entity based on ``entity_props``
'''
logger.info('Fetching token')
token = get_port_api_token()

headers = {
'Authorization': f'Bearer {token}'
}

entity = {
'properties': {
property_name: property_value
}
}

logger.info('Updating entity property values:')
logger.info(json.dumps(entity))
response = requests.patch(f'{API_URL}/blueprints/{blueprint_identifier}/entities/{identifier}', json=entity, headers=headers)
logger.info(response.status_code)
logger.info(json.dumps(response.json()))

def lambda_handler(event, context):
'''
Receives an event from AWS, if configured with a Kafka Trigger, the event includes an array of base64 encoded messages from the different topic partitions
'''
logger.info('## ENVIRONMENT VARIABLES\r' + jsonpickle.encode(dict(**os.environ)))
logger.info('## EVENT\r' + jsonpickle.encode(event))
logger.info('## CONTEXT\r' + jsonpickle.encode(context))
for messages in event['records'].values():
for encoded_message in messages:
try:
message_json_string = base64.b64decode(encoded_message['value']).decode('utf-8')
logger.info('Received message:')
logger.info(message_json_string)
message = json.loads(message_json_string)

change_type = message['action']
resource_type = message['resourceType']

# "message" includes one change that occurred in the service catalog
# since all changes that happen in the catalog will trigger this Lambda, it would be a good idea to add separate handler
# functions to keep code maintainable

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Your handler code for the changes in the catalog comes here #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

# Here is sample code to find the change in VM free storage space
if change_type == 'UPDATE' and resource_type == 'entity':
blueprint_identifier = message['context']['blueprint']
entity_after_change_state = message['diff']['after']
entity_identifier = entity_after_change_state["identifier"]
entity_title = entity_after_change_state["title"]
entity_props_after_change = entity_after_change_state['properties']
entity_total_storage = entity_props_after_change['storage_size']
entity_free_storage = entity_props_after_change['free_storage']

if entity_total_storage * 0.1 > entity_free_storage:
logger.warning(f'Entity {entity_title} free storage is too low, fixing...')
# Assume a call to direct storage extensions using cloud provider SDK
# Or a call to some scheduled task that frees up storage on the VM
logger.info(f'Entity {entity_title} Storage freed up, updating in Port')
free_storage_after_cleanup = 4
update_entity_prop_value(blueprint_identifier, entity_identifier, 'free_storage', free_storage_after_cleanup)
except Exception as e:
traceback.print_exc()
logger.warn(f'Error: {e}')
return {"message": "ok"}

部署 Lambda 函数

为了部署 Lambda 函数,请在终端运行以下命令(注意注释中需要将 Lambda 代码粘贴到新文件的地方):

请确保将 ROLE_ARN 替换为为 Lambda 创建执行角色时作为 Output 收到的 ARN。
mkdir port_execution_lambda
cd port_execution_lambda
touch lambda_function.py
# Open lambda_function.py in your favorite editor or IDE and paste
# in the python code written above
# Once the code is in, we can package the lambda and deploy it
zip -FSr function.zip lambda_function.py
# Now let's deploy the Lambda to AWS
aws lambda create-function --function-name port-changelog-lambda \
--zip-file fileb://function.zip --handler lambda_function.lambda_handler --runtime python3.9 \
--role ROLE_ARN --timeout 30

您应该会看到类似下面的 Output:

{
"FunctionName": "port-execution-lambda",
"FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:port-changelog-lambda",
"Runtime": "python3.9",
"Role": "arn:aws:iam::123456789012:role/lambda-port-execution-role",
"Handler": "lambda_function.lambda_handler",
"CodeSha256": "FpFMvUhayLkOoVBpNuNiIVML/tuGv2iJQ7t0yWVTU8c=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "88ebe1e1-bfdf-4dc3-84de-3017268fa1ff",
...
}

您只需几步就能实现完整的执行流程!

把所有东西放在一起

只剩下几个步骤了:

  • 在 Lambda 函数中添加代码层
  • 将 Port CLIENT_IDCLIENT_SECRET 作为环境变量添加到 Lambda 中
  • 添加 Kafka 触发器

要添加图层,只需运行一个简单的 CLI 命令即可:

# Be sure to replace the LAYER_VERSION_ARN with the value you saved
# from the layer creation output
aws lambda update-function-configuration --function-name port-changelog-lambda \
--layers LAYER_VERSION_ARN

您应该会看到一个 Output,显示现在 Lambda 的 Layers 数组包含了我们的层

现在添加 client_id 和 secret 变量:

# Be sure to replace YOUR_CLIENT_ID and YOUR_CLIENT_SECRET with real values
aws lambda update-function-configuration --function-name port-changelog-lambda --environment "Variables={PORT_CLIENT_ID=YOUR_CLIENT_ID,PORT_CLIENT_SECRET=YOUR_CLIENT_SECRET}" --query "Environment"

在命令输出中,你应该能看到为 Lambda 函数配置的所有 secrets。

如果您的函数需要多个环境变量,将它们全部放入一个 JSON 文件(例如 environment.json)并运行以下命令会更简单:
aws lambda update-function-configuration --function-name port-changelog-lambda --environment file://environment.json --query "Environment"

添加 Kafka 触发器的时间到了

# Remember to replace YOUR_ORG_ID, SECRET_ARN, and YOUR_KAFKA_CONSUMER_GROUP
aws lambda create-event-source-mapping --topics YOUR_ORG_ID.change.log --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=SECRET_ARN \
--function-name port-changelog-lambda \
--batch-size 1 --starting-position LATEST \
--self-managed-kafka-event-source-config '{"ConsumerGroupId":"YOUR_KAFKA_CONSUMER_GROUP"}' \
--self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["b-1-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196", "b-2-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196", "b-3-public.publicclusterprod.t9rw6w.c1.kafka.eu-west-1.amazonaws.com:9196"]}}'

Reacting to changes

现在,您已经为 Lambda 函数配置了 Kafka 触发器,服务目录中的每次更改都会在您在触发器中指定的专用 Kafka 主题中生成一条新消息。 该消息将被发送到您部署的 Lambda 函数,其中包含了解发生了什么更改所需的所有输入数据,以便您可以自此采取相应的action。

有关托管 Apache Kafka 触发器数据格式的更多信息,请参阅AWS docs 。你在 lambda_handler 函数中编写的代码已经对所有新消息进行过解析、解码并转换为 python 字典,以方便使用。

让我们创建一个总存储容量为 10 GB 的虚拟机实体,初始可用存储容量为 6 GB。

下面是这个 VM 实体的 JSON:

{
"title": "Storage Example VM",
"team": [],
"blueprint": "vm",
"properties": {
"region": "eu-west-1",
"cpu_cores": 2,
"memory_size": 4,
"storage_size": 10,
"deployed": "Deployed",
"free_storage": 6
},
"relations": {},
"identifier": "storage-example"
}

下面是创建虚拟机的 "python "代码(切记插入 "CLIENT_ID "和 "CLIENT_SECRET "以获取访问令牌)。

Click here to see the code
import requests

CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'

API_URL = 'https://api.getport.io/v1'

credentials = {'clientId': CLIENT_ID, 'clientSecret': CLIENT_SECRET}

token_response = requests.post(f'{API_URL}/auth/access_token', json=credentials)

access_token = token_response.json()['accessToken']

blueprint_identifier = 'vm'

headers = {
'Authorization': f'Bearer {access_token}'
}

entity = {
"title": "Storage Example VM",
"team": [],
"properties": {
"region": "eu-west-1",
"cpu_cores": 2,
"memory_size": 4,
"storage_size": 10,
"deployed": "Deployed",
"free_storage": 6
},
"relations": {},
"identifier": "storage-example"
}

response = requests.post(f'{API_URL}/blueprints/{blueprint_identifier}/entities', json=entity, headers=headers)

print(response.json())

让我们模拟一下虚拟机中的可用存储空间下降到总可用存储空间 10%以下的情况。 我们将使用 python 中的 PATCH 请求来更新虚拟机上的 free_storage 属性。

Click here to see the API call code
import requests

CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'

API_URL = 'https://api.getport.io/v1'

credentials = {'clientId': CLIENT_ID, 'clientSecret': CLIENT_SECRET}

token_response = requests.post(f'{API_URL}/auth/access_token', json=credentials)

access_token = token_response.json()['accessToken']

blueprint_identifier = 'vm'

headers = {
'Authorization': f'Bearer {access_token}'
}

entity_identifier = "storage-example"

patch_body = {
"properties": {
"free_storage": 0.5
}
}

response = requests.patch(f'{API_URL}/blueprints/{blueprint_identifier}/entities/{entity_identifier}', json=patch_body, headers=headers)

print(response.json())

这一更改将自动向 Kafka 主题发送一条信息。

现在,Lambda 函数的 CloudWatch 日志(可在 AWS 控制台中通过 Lambda→functions→port-execution-lambda→Monitor→Logs→View logs in CloudWatch 访问)将显示 Lambda 函数的最新执行日志。 其中还包括实际收到的消息以及我们的 python 代码所执行操作的日志:

Cloudwatch logs example

下面是 Kafka 消息中从 Port 接收到的请求有效载荷的示例(注意 "before "和 "after "键显示了虚拟机实体属性的不同):

{
"action": "UPDATE",
"resourceType": "entity",
"status": "SUCCESS",
"trigger": {
"by": {
"orgId": "org_sYG4DOJZNGy8bYnt",
"userId": "h2Mf13aRSCYQCUPIcqufoP4XRLwAt8Od@clients"
},
"at": "2022-07-15T16:44:56.288Z",
"origin": "API"
},
"context": {
"entity": "storage-example",
"blueprint": "vm"
},
"diff": {
"before": {
"identifier": "storage-example",
"title": "Storage Example VM",
"blueprint": "vm",
"properties": {
"region": "eu-west-1",
"cpu_cores": 2,
"memory_size": 4,
"storage_size": 10,
"deployed": "Deployed",
"free_storage": 6
},
"relations": {},
"createdAt": "2022-07-15T16:34:47.385Z",
"createdBy": "h2Mf13aRSCYQCUPIcqufoP4XRLwAt8Od@clients",
"updatedAt": "2022-07-15T16:34:47.385Z",
"updatedBy": "h2Mf13aRSCYQCUPIcqufoP4XRLwAt8Od@clients"
},
"after": {
"identifier": "storage-example",
"title": "Storage Example VM",
"blueprint": "vm",
"properties": {
"region": "eu-west-1",
"cpu_cores": 2,
"memory_size": 4,
"storage_size": 10,
"deployed": "Deployed",
"free_storage": 0.5
},
"relations": {},
"createdAt": "2022-07-15T16:34:47.385Z",
"createdBy": "h2Mf13aRSCYQCUPIcqufoP4XRLwAt8Od@clients",
"updatedAt": "2022-07-15T16:34:47.385Z",
"updatedBy": "h2Mf13aRSCYQCUPIcqufoP4XRLwAt8Od@clients"
}
}
}

除了在 Cloudwatch 中看到消息主题外,Lambda 函数代码还会更新虚拟机实体,并赋予其新的可用存储值。

下一步

这只是一个非常基本的示例,说明如何监听软件目录中的变化并作出反应。 我们留下了占位符代码,供您插入自己的自定义逻辑,以适应您的基础架构。

如果您想更深入地了解 Port 的执行功能,可以探索基于 Kafka 的自助服务操作的其他示例。